RxJava组合多个请求的序列

34

问题

我有两个 APIs。API 1 给我一个 Items 列表,API 2 为我提供了从 API 1 获取的每个项目的更详细信息。到目前为止,我解决这个问题的方式导致了性能不佳。

问题

如何使用 Retrofit 和 RxJava 解决这个问题,使其更有效率和快速。

我的方法

目前我的解决方案是:

第一步:Retrofit 执行从 API 1 获取的 Single<ArrayList<Information>>

第二步:我遍历这些 Items,并对每个 Item 向 API 2 发出请求。

第三步:Retrofit 返回按顺序执行每个项目的 Single<ExtendedInformation>

第四步:在 API 2 的所有调用完全执行后,我为所有项目创建一个新对象,组合了 Information 和 Extended Information。

我的代码

 public void addExtendedInformations(final Information[] informations) {
        final ArrayList<InformationDetail> informationDetailArrayList = new ArrayList<>();
        final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener = new JSONRequestRatingHelper.RatingRequestListener() {
            @Override
            public void onDownloadFinished(Information baseInformation, ExtendedInformation extendedInformation) {
                informationDetailArrayList.add(new InformationDetail(baseInformation, extendedInformation));
                if (informationDetailArrayList.size() >= informations.length){
                    listener.onAllExtendedInformationLoadedAndCombined(informationDetailArrayList);
                }
            }
        };

        for (Information information : informations) {
            getExtendedInformation(ratingRequestListener, information);
        }
    }

    public void getRatingsByTitle(final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener, final Information information) {
        Single<ExtendedInformation> repos = service.findForTitle(information.title);
        disposable.add(repos.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ExtendedInformation>() {
            @Override
            public void onSuccess(ExtendedInformation extendedInformation) {
                    ratingRequestListener.onDownloadFinished(information, extendedInformation);
            }

            @Override
            public void onError(Throwable e) {
                ExtendedInformation extendedInformation = new ExtendedInformation();
                ratingRequestListener.onDownloadFinished(extendedInformation, information);
            }
        }));
    }

    public interface RatingRequestListener {

        void onDownloadFinished(Information information, ExtendedInformation extendedInformation);

    }

1
为什么你的方法需要“同步化”? - bric3
@Brice 以前它们需要同步,我忘记移除了。谢谢 :) - Mayr Technologies
你有点让自己处于这种状态。问题:1. 是否真的需要逐行查询?(通常批量查询更有效率)。2. 你是否真的必须等待所有结果到达才能继续进行?你可以选择在块到达时立即显示结果吗? - M. Prokhorov
5个回答

43

tl;dr 使用 concatMapEagerflatMap 并异步地执行子调用或在调度程序上执行。


长话短说

我不是安卓开发者,所以我的问题将仅限于纯 RxJava(版本 1 和版本 2)。

如果我理解正确,所需的流程为:

some query param 
  \--> Execute query on API_1 -> list of items
          |-> Execute query for item 1 on API_2 -> extended info of item1
          |-> Execute query for item 2 on API_2 -> extended info of item1
          |-> Execute query for item 3 on API_2 -> extended info of item1
          ...
          \-> Execute query for item n on API_2 -> extended info of item1
  \----------------------------------------------------------------------/
      |
      \--> stream (or list) of extended item info for the query param

假设 Retrofit 生成了客户端用于

interface Api1 {
    @GET("/api1") Observable<List<Item>> items(@Query("param") String param);
}

interface Api2 {
    @GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);
}

如果物品的顺序不重要,那么可以只使用flatMap

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .flatMap(item -> api2.extendedInfo(item.id()))
    .subscribe(...)

仅当retrofit builder被配置为:

  • 使用异步适配器(调用将排队在okhttp内部执行器中)。我个人认为这不是一个好主意,因为你没有控制权。

.addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
  • 或使用基于调度程序的适配器(调用将在RxJava调度程序上进行安排)。这是我首选的选项,因为您可以明确选择要使用的调度程序,最有可能使用的是IO调度程序,但您可以尝试不同的调度程序。

  • .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
    

    原因是flatMap将订阅由api2.extendedInfo(...)创建的每个observable,并将它们合并到生成的observable中。因此,结果将按接收顺序出现。

    如果retrofit客户端未设置为异步或设置为在调度程序上运行,则可以设置一个:

    api1.items(queryParam)
        .flatMap(itemList -> Observable.fromIterable(itemList)))
        .flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
        .subscribe(...)
    

    这个结构与之前的几乎相同,唯一不同的是它指示每个api2.extendedInfo在哪个调度器上本地运行。

    可以通过调整flatMapmaxConcurrency参数来控制同时执行的请求数量。虽然我建议小心使用,因为不希望同时运行所有查询。通常默认值maxConcurrency足够好(128)。

    如果原始查询顺序很重要concatMap通常是以顺序但按顺序执行相同操作的运算符,但如果代码需要等待所有子查询完成,则速度较慢。然而,解决方案更进一步,使用concatMapEager,它将按顺序订阅可观察对象,并根据需要缓冲结果。

    假设retrofit客户端是异步或在特定调度程序上运行:

    api1.items(queryParam)
        .flatMap(itemList -> Observable.fromIterable(itemList)))
        .concatMapEager(item -> api2.extendedInfo(item.id()))
        .subscribe(...)
    

    或者如果需要在本地设置调度程序:

    api1.items(queryParam)
        .flatMap(itemList -> Observable.fromIterable(itemList)))
        .concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
        .subscribe(...)
    

    在这个操作符中,调整并发度也是可能的。


    另外,如果 API 返回 Flowable,在 RxJava 2.1.7 中仍处于测试版的 .parallel 可以使用。但结果不是按顺序排列的,我还不知道有没有办法在不排序的情况下对它们进行排序(目前?)。

    api.items(queryParam) // Flowable<Item>
       .parallel(10)
       .runOn(Schedulers.io())
       .map(item -> api2.extendedInfo(item.id()))
       .sequential();     // Flowable<ItemExtended>
    

    1
    item.id() 可以从 item 中获取,而 item 是在 itemList 中找到的任何项(此列表通过 .flatMap(itemList -> Observable.fromIterable(itemList)) 进行 _flat mapped_,以便将列表的每个元素作为可观察流中的单个元素进行 _pipe_)。 - bric3
    如果在调用.flatMap(item -> api2.extendedInfo(item.id()))之前不使用.flatMapIterable(itemList -> Arrays.asList(itemList.items)),它会显示一个错误。 - Mayr Technologies
    1
    如果您需要将extendedInfo累加到列表中,可以使用.toList()运算符。如果使用RxJava 2 toList返回Single<List<...>>,要获取Observable<List<...>>,可以链接.toList().toObservable() - bric3
    1
    是的,有必要对api1调用返回的列表进行“flat map”操作。可以通过.flatMap(itemList -> Observable.fromIterable(itemList)).flatMapIterable(itemList -> Arrays.asList(itemList.items))来实现(如果itemList实现了Iterable接口,则可以简化为.flatMapIterable(itemList -> itemList))。 - bric3
    1
    这是一份出色的答案,我想谢谢你花时间详细解释所有内容。关于调用适配器工厂和调度程序的解释非常有帮助。 - Matthew Bahr
    显示剩余7条评论

    7

    flatMap 运算符旨在满足这些类型的工作流。

    我将用一个简单的五步示例概述其基本原理。希望您可以轻松地在自己的代码中重建相同的原则:

    @Test fun flatMapExample() {
        // (1) constructing a fake stream that emits a list of values
        Observable.just(listOf(1, 2, 3, 4, 5))
                // (2) convert our List emission into a stream of its constituent values 
                .flatMap { numbers -> Observable.fromIterable(numbers) }
                // (3) subsequently convert each individual value emission into an Observable of some 
                //     newly calculated type
                .flatMap { number ->
                    when(number) {
                           1 -> Observable.just("A1")
                           2 -> Observable.just("B2")
                           3 -> Observable.just("C3")
                           4 -> Observable.just("D4")
                           5 -> Observable.just("E5")
                        else -> throw RuntimeException("Unexpected value for number [$number]")
                    }
                }
                // (4) collect all the final emissions into a list
                .toList()
                .subscribeBy(
                        onSuccess = {
                            // (5) handle all the combined results (in list form) here
                            println("## onNext($it)")
                        },
                        onError = { error ->
                            println("## onError(${error.message})")
                        }
                )
    }
    

    顺便说一下,如果排放的顺序很重要,可以考虑使用 concatMap

    希望这能帮到你。


    谢谢您的回答。但是我不确定如何将其应用到我的示例中。 - Mayr Technologies
    这并没有完全回答问题,例如列表不是被冻结的,而是查询结果,因此Kotlin中的when语句无法使用,甚至不能考虑不使用Kotlin的会计项目,另外它也没有说明为什么或者如何可以更快。此外,默认情况下flatMap不是并发的,因此实际上并没有加速处理速度。 - bric3
    @Brice 这个示例代码实现了毫无意义但简单的逻辑,以展示更广泛的观点。when并不是我试图展示的重点。你正确地指出它并没有展示任何特定的速度提升,但更重要的是(在我看来),它展示了习惯用法 Rx 在转换流时而不是打破它并恢复到某种形式的“回调地狱”,就像原始解决方案所做的那样。你(写得很好的)解决方案甚至承认几乎没有性能提升可言:“通常默认值已经足够好了”。 - homerman
    回调的问题确实需要解决,但这不是问题的一部分。这部分并没有暗示性能可以忽略不计。真正的技巧是在flatMapconcatMapEager的组合中异步执行子查询。引用部分涉及到maxConcurrency参数,它允许通过flatMapconcatMapEager同时订阅多少个子可观察对象。 - bric3
    @Brice,我不同意库的惯用用法不是解决方案的一部分,当发帖者要求“高效快速”的解决方案时。公平地说,我认为你和我分别解决了这两个问题。(但我喜欢你详细阐述了concatMap的排列组合)。 - homerman
    关于习惯用语的观点很中肯。但我是个挑剔的人,对此感到抱歉;when是Kotlin的构造函数,据我所知它不会异步执行任何操作,因此这并不会使代码更快,但它确实使代码更符合RxJava风格,这是好的。此外,代码抛出了一个flatMap异常,而应该返回一个Observable.error - bric3

    2

    请查看下面的演示。

    假设你需要进行多个网络调用,例如获取Github用户信息和Github用户事件。

    并且你希望在更新UI之前等待每个请求返回。 RxJava可以帮助你实现这一点。首先定义我们的Retrofit对象以访问Github的API,然后设置两个可观测对象来进行这两个网络请求调用。

    Retrofit repo = new Retrofit.Builder()
            .baseUrl("https://api.github.com")
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
            .build();
    
    Observable<JsonObject> userObservable = repo
            .create(GitHubUser.class)
            .getUser(loginName)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread());
    
    Observable<JsonArray> eventsObservable = repo
            .create(GitHubEvents.class)
            .listEvents(loginName)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread());
    

    使用的界面如下:

    public interface GitHubUser {
      @GET("users/{user}")
      Observable<JsonObject> getUser(@Path("user") String user);
    }
    
    public interface GitHubEvents {
      @GET("users/{user}/events")
      Observable<JsonArray> listEvents(@Path("user") String user);
    }
    

    在使用RxJava的zip方法将两个Observables组合并等待它们完成后,我们才会创建一个新的Observable。

    Observable<UserAndEvents> combined = Observable.zip(userObservable, eventsObservable, new Func2<JsonObject, JsonArray, UserAndEvents>() {
      @Override
      public UserAndEvents call(JsonObject jsonObject, JsonArray jsonElements) {
        return new UserAndEvents(jsonObject, jsonElements);
      }
    });
    

    最后,让我们在新的合并Observable上调用subscribe方法:
    combined.subscribe(new Subscriber<UserAndEvents>() {
              ...
              @Override
              public void onNext(UserAndEvents o) {
                // You can access the results of the 
                // two observabes via the POJO now
              }
            });
    

    没有必要再等待线程完成网络调用了。在zip()方法中,RxJava已经为您完成了所有这些工作。 希望我的回答对你有所帮助。

    1

    我用RxJava2解决了一个类似的问题。并行执行Api 2的请求稍微加快了工作速度。

    private InformationRepository informationRepository;
    
    //init....
    
    public Single<List<FullInformation>> getFullInformation() {
        return informationRepository.getInformationList()
                .subscribeOn(Schedulers.io())//I usually write subscribeOn() in the repository, here - for clarity
                .flatMapObservable(Observable::fromIterable)
                .flatMapSingle(this::getFullInformation)
                .collect(ArrayList::new, List::add);
    
    }
    
    private Single<FullInformation> getFullInformation(Information information) {
        return informationRepository.getExtendedInformation(information)
                .map(extendedInformation -> new FullInformation(information, extendedInformation))
                .subscribeOn(Schedulers.io());//execute requests in parallel
    }
    

    InformationRepository - 只是一个接口。它的实现对我们来说不重要。

    public interface InformationRepository {
    
        Single<List<Information>> getInformationList();
    
        Single<ExtendedInformation> getExtendedInformation(Information information);
    }
    

    FullInformation - 包含结果的容器。

    public class FullInformation {
    
        private Information information;
        private ExtendedInformation extendedInformation;
    
        public FullInformation(Information information, ExtendedInformation extendedInformation) {
            this.information = information;
            this.extendedInformation = extendedInformation;
        }
    }
    

    0

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接