RxJava 并行获取 Observables

56

我需要在RxJava中实现并行异步调用,我选择了一个简单的用例,其中第一个调用获取(搜索)要显示的产品(Tile)列表。随后的调用会获取(A)评论和(B)产品图片。

经过多次尝试,我到了这个地方。

 1    Observable<Tile> searchTile = searchServiceClient.getSearchResults(searchTerm);
 2    List<Tile> allTiles = new ArrayList<Tile>();
 3    ClientResponse response = new ClientResponse();

 4    searchTile.parallel(oTile -> {
 5      return oTile.flatMap(t -> {
 6        Observable<Reviews> reviews = reviewsServiceClient.getSellerReviews(t.getSellerId());
 7        Observable<String> imageUrl = reviewsServiceClient.getProductImage(t.getProductId());

 8        return Observable.zip(reviews, imageUrl, (r, u) -> {
 9          t.setReviews(r);
10          t.setImageUrl(u);

11          return t;
12        });

13      });
14    }).subscribe(e -> {
15      allTiles.add((Tile) e);
16    });

第1行:获取要显示的产品(Tile)

第4行:我们将Observable列表分片以获取评论和图像URL

第6,7行:获取Observable评论和Observable URL

第8行:最后,这两个observables被压缩在一起,返回一个更新后的Observable

第15行:最后,第15行汇总所有单独的产品,以便在集合中显示,并可以返回到调用层

虽然Observable已经被分片并在我们的测试中运行了4个不同的线程;获取评论和图像似乎是一个接一个地进行的。我怀疑第8行的zip步骤基本上会导致对2个observables(评论和URL)的顺序调用。

enter image description here

这个团队有没有建议可以同时获取评论和图像URL。实质上,上面附图中的瀑布图应该更加垂直堆叠。对于评论和图像的调用应该是并行的。

谢谢 安纳德·拉曼


你是如何生成转移时间线图的?它看起来非常酷且有用。我也想使用它。 - Aravind Yarram
由于我的系统正在进行外部调用,因此我只需通过 Fiddler 代理这些调用。Fiddler 有一个生成网络时间轴的选项。您基本上是在查看该视图。在为代理请求设置 Fiddler 后,只需选择您感兴趣的会话,然后单击右窗格上的时间轴选项卡即可。谢谢,Anand。 - diduknow
2个回答

89

并行操作符在几乎所有的用例中都会出现问题,而且并不能像大多数人所期望的那样执行,因此在1.0.0.rc.4版本中被删除:https://github.com/ReactiveX/RxJava/pull/1716

如何实现这种类型的行为并获得并行执行的良好示例可以在这里看到。

在您的示例代码中,不清楚searchServiceClient是同步还是异步的。它会稍微影响如何解决问题,如果已经是异步的,则不需要额外的调度;如果是同步的,则需要额外的调度。

首先,这里有一些简单的示例,显示同步和异步行为:

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ParallelExecution {

    public static void main(String[] args) {
        System.out.println("------------ mergingAsync");
        mergingAsync();
        System.out.println("------------ mergingSync");
        mergingSync();
        System.out.println("------------ mergingSyncMadeAsync");
        mergingSyncMadeAsync();
        System.out.println("------------ flatMapExampleSync");
        flatMapExampleSync();
        System.out.println("------------ flatMapExampleAsync");
        flatMapExampleAsync();
        System.out.println("------------");
    }

    private static void mergingAsync() {
        Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println);
    }

    private static void mergingSync() {
        // here you'll see the delay as each is executed synchronously
        Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println);
    }

    private static void mergingSyncMadeAsync() {
        // if you have something synchronous and want to make it async, you can schedule it like this
        // so here we see both executed concurrently
        Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println);
    }

    private static void flatMapExampleAsync() {
        Observable.range(0, 5).flatMap(i -> {
            return getDataAsync(i);
        }).toBlocking().forEach(System.out::println);
    }

    private static void flatMapExampleSync() {
        Observable.range(0, 5).flatMap(i -> {
            return getDataSync(i);
        }).toBlocking().forEach(System.out::println);
    }

    // artificial representations of IO work
    static Observable<Integer> getDataAsync(int i) {
        return getDataSync(i).subscribeOn(Schedulers.io());
    }

    static Observable<Integer> getDataSync(int i) {
        return Observable.create((Subscriber<? super Integer> s) -> {
            // simulate latency
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                s.onNext(i);
                s.onCompleted();
            });
    }
}

以下是一个尝试提供与您的代码更相似的示例:
import java.util.List;

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ParallelExecutionExample {

    public static void main(String[] args) {
        final long startTime = System.currentTimeMillis();

        Observable<Tile> searchTile = getSearchResults("search term")
                .doOnSubscribe(() -> logTime("Search started ", startTime))
                .doOnCompleted(() -> logTime("Search completed ", startTime));

        Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
            Observable<Reviews> reviews = getSellerReviews(t.getSellerId())
                    .doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime));
            Observable<String> imageUrl = getProductImage(t.getProductId())
                    .doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime));

            return Observable.zip(reviews, imageUrl, (r, u) -> {
                return new TileResponse(t, r, u);
            }).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime));
        });

        List<TileResponse> allTiles = populatedTiles.toList()
                .doOnCompleted(() -> logTime("All Tiles Completed ", startTime))
                .toBlocking().single();
    }

    private static Observable<Tile> getSearchResults(String string) {
        return mockClient(new Tile(1), new Tile(2), new Tile(3));
    }

    private static Observable<Reviews> getSellerReviews(int id) {
        return mockClient(new Reviews());
    }

    private static Observable<String> getProductImage(int id) {
        return mockClient("image_" + id);
    }

    private static void logTime(String message, long startTime) {
        System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms");
    }

    private static <T> Observable<T> mockClient(T... ts) {
        return Observable.create((Subscriber<? super T> s) -> {
            // simulate latency
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                }
                for (T t : ts) {
                    s.onNext(t);
                }
                s.onCompleted();
            }).subscribeOn(Schedulers.io());
        // note the use of subscribeOn to make an otherwise synchronous Observable async
    }

    public static class TileResponse {

        public TileResponse(Tile t, Reviews r, String u) {
            // store the values
        }

    }

    public static class Tile {

        private final int id;

        public Tile(int i) {
            this.id = i;
        }

        public int getSellerId() {
            return id;
        }

        public int getProductId() {
            return id;
        }

    }

    public static class Reviews {

    }
}

这将输出:
Search started  => 65ms
Search completed  => 1094ms
getProductImage[1] completed  => 2095ms
getSellerReviews[2] completed  => 2095ms
getProductImage[3] completed  => 2095ms
zip[1] completed  => 2096ms
zip[2] completed  => 2096ms
getProductImage[2] completed  => 2096ms
getSellerReviews[1] completed  => 2096ms
zip[3] completed  => 2096ms
All Tiles Completed  => 2097ms
getSellerReviews[3] completed  => 2097ms

我已经将每个IO调用模拟为需要1000毫秒,这样就很明显可以看到延迟发生在哪里,并且是并行执行的。它会打印出已经流逝的毫秒数。
关键在于flatMap合并异步调用,只要被合并的Observables是异步的,它们就会同时执行。
如果像getProductImage(t.getProductId())这样的调用是同步的,那么可以通过以下方式使其异步:getProductImage(t.getProductId()).subscribeOn(Schedulers.io)。
以下是上述示例中重要部分,省略了所有记录和样板类型。
    Observable<Tile> searchTile = getSearchResults("search term");;

    Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
        Observable<Reviews> reviews = getSellerReviews(t.getSellerId());
        Observable<String> imageUrl = getProductImage(t.getProductId());

        return Observable.zip(reviews, imageUrl, (r, u) -> {
            return new TileResponse(t, r, u);
        });
    });

    List<TileResponse> allTiles = populatedTiles.toList()
            .toBlocking().single();

感谢@benjchristensen的精彩回答。它提供了清晰度并解决了我的问题。还要感谢您指出[https://github.com/benjchristensen/ReactiveLab]中丰富的示例库。我会在周末深入研究它。 - diduknow
doOnXXX() 方法的目的是什么? - Aravind Yarram
@Pangea,我认为这些调用的目的是在事件发生时打印输出,以便您可以看到它正在并行工作。 - ivant
这是一个很棒的答案! - JakeWilson801
1
我认为像 mergingSync 这样的同步调用不需要使用 toBlocking。只有在进行异步调用时才需要使用它。 - Ravindra Ranwala

4

仍在使用 JDK 7 的用户,其 IDE 尚未自动检测到 JDK 8 源代码,想要尝试上述由 @benjchristensen 提供的杰出响应(和解释)的人可以使用这个无耻重构过的、适用于 JDK 7 的代码。向 @benjchristensen 致敬,对他的出色解释和示例表示赞赏!

import java.util.List;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

public class ParallelExecutionExample
{

    public static void main(String[] args)
    {
        final long startTime = System.currentTimeMillis();

        Observable<Tile> searchTile = getSearchResults("search term")
                .doOnSubscribe(new Action0()
                        {

                            @Override
                            public void call()
                            {
                                logTime("Search started ", startTime);
                            }
                })
                .doOnCompleted(new Action0()
                        {

                            @Override
                            public void call()
                            {
                                logTime("Search completed ", startTime);
                            }
                });
        Observable<TileResponse> populatedTiles = searchTile.flatMap(new Func1<Tile, Observable<TileResponse>>()
        {

            @Override
            public Observable<TileResponse> call(final Tile t)
            {
                Observable<Reviews> reviews = getSellerReviews(t.getSellerId())
                        .doOnCompleted(new Action0()
                                {

                                    @Override
                                    public void call()
                                    {
                                        logTime("getSellerReviews[" + t.id + "] completed ", startTime);
                                    }
                        });
                Observable<String> imageUrl = getProductImage(t.getProductId())
                        .doOnCompleted(new Action0()
                                {

                                    @Override
                                    public void call()
                                    {
                                        logTime("getProductImage[" + t.id + "] completed ", startTime);
                                    }
                        });
                return Observable.zip(reviews, imageUrl, new Func2<Reviews, String, TileResponse>()
                {

                    @Override
                    public TileResponse call(Reviews r, String u)
                    {
                        return new TileResponse(t, r, u);
                    }
                })
                        .doOnCompleted(new Action0()
                                {

                                    @Override
                                    public void call()
                                    {
                                        logTime("zip[" + t.id + "] completed ", startTime);
                                    }
                        });
            }
        });

        List<TileResponse> allTiles = populatedTiles
                .toList()
                .doOnCompleted(new Action0()
                        {

                            @Override
                            public void call()
                            {
                                logTime("All Tiles Completed ", startTime);
                            }
                })
                .toBlocking()
                .single();
    }

    private static Observable<Tile> getSearchResults(String string)
    {
        return mockClient(new Tile(1), new Tile(2), new Tile(3));
    }

    private static Observable<Reviews> getSellerReviews(int id)
    {
        return mockClient(new Reviews());
    }

    private static Observable<String> getProductImage(int id)
    {
        return mockClient("image_" + id);
    }

    private static void logTime(String message, long startTime)
    {
        System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms");
    }

    private static <T> Observable<T> mockClient(final T... ts)
    {
        return Observable.create(new Observable.OnSubscribe<T>()
        {

            @Override
            public void call(Subscriber<? super T> s)
            {
                try
                {
                    Thread.sleep(1000);
                }
                catch (Exception e)
                {
                }
                for (T t : ts)
                {
                    s.onNext(t);
                }
                s.onCompleted();
            }
        })
                .subscribeOn(Schedulers.io());
        // note the use of subscribeOn to make an otherwise synchronous Observable async
    }

    public static class TileResponse
    {

        public TileResponse(Tile t, Reviews r, String u)
        {
            // store the values
        }

    }

    public static class Tile
    {

        private final int id;

        public Tile(int i)
        {
            this.id = i;
        }

        public int getSellerId()
        {
            return id;
        }

        public int getProductId()
        {
            return id;
        }

    }

    public static class Reviews
    {

    }
} 

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接