将一个Observable列表合并,并等待所有Observable完成

103

TL;DR 如何将Task.whenAll(List<Task>)转换为RxJava

我的现有代码使用Bolts来构建异步任务列表,并等待所有这些任务完成后执行其他步骤。本质上,它建立了一个List<Task>并返回一个单独的Task,当列表中的所有任务都完成时标记为已完成,就像在Bolts站点上的示例中所述。

我想用RxJava替换Bolts,并且我假设可以通过这种方法构建异步任务列表(预先不知道大小)并将它们全部包装成单个Observable,但我不知道该怎么做。

我尝试过查看mergezipconcat等方法...但是无法处理我所构建的List<Observable>,因为如果我正确理解文档,它们都是针对同时处理两个Observables 的情况。

我正在学习RxJava,并且对此还很陌生,所以如果这是一个明显的问题或在文档中解释了,请原谅我;我已经尝试过搜索。非常感谢您提供任何帮助。

8个回答

89

如果您有动态任务组合的情况,可以使用flatMap。就像这样:

public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
    return Observable.from(tasks)
            //execute in parallel
            .flatMap(task -> task.observeOn(Schedulers.computation()))
            //wait, until all task are executed
            //be aware, all your observable should emit onComplete event
            //otherwise you will wait forever
            .toList()
            //could implement more intelligent logic. eg. check that everything is successful
            .map(results -> true);
}

另一个并行执行的好例子

注意:我不太清楚您对错误处理的要求。例如,如果只有一个任务失败该怎么办。我认为您应该验证这种情况。


19
考虑到问题陈述了“当列表中的所有任务完成时”,因此这应该是被接受的答案。zip在其中一个任务完成时就会通知其完成,因此不适用。 - user3707125
1
@MyDogTom:你能用Java7语法(不是lambda版本)更新答案吗? - sanedroid
3
使用lambda表达式会使代码更易读。只需将第一个lambda表达式替换为new Func1<Observable<Boolean>, Observable<Boolean>>(),第二个lambda表达式替换为new Func1<List<Boolean>, Boolean>()即可。 - MyDogTom
@soshial RxJava 2 是 RxJava 史上最糟糕的事情,是的。 - egorikem

78

看起来你正在寻找Zip操作符

有几种不同的使用方法,让我们看一个例子。假设我们有一些不同类型的简单observable:

Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);

等待它们全部完成的最简单方法可能是这样:

Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));

请注意,在zip函数中,参数具有与被压缩的可观察对象的类型相对应的具体类型。

还可以直接对可观察对象列表进行压缩:

List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);

Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

...或通过将列表包装为一个 Observable<Observable<?>>

Observable<Observable<?>> obsObs = Observable.from(obsList);

Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

不过,在这两种情况下,由于列表中观察对象的类型和数量未知,因此zip函数只能接受一个Object[]参数。这意味着zip函数必须检查参数数量并相应地进行转换。

无论如何,以上所有示例最终都会打印出1 Blah true

编辑:在使用Zip时,请确保被压缩的Observables发出相同数量的项。在上面的示例中,所有三个观察对象都发出了单个项目。如果我们将它们更改为以下内容:

Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items

那么1,Blah,True2,Hello,True将是传递到zip函数的唯一项目。由于其他可观测对象已经完成,因此3项永远不会被压缩。


13
如果其中一个调用失败,这种方法就行不通了。在这种情况下,所有的调用都会失败。 - StarWind0
1
@StarWind0,你可以通过使用onErrorResumeNext跳过错误,例如:Observable.zip(ob1, ob2........).onErrorResumeNext(Observable.<String>empty()) - vuhung3990
如果我有100个可观察对象怎么办? - Krzysztof Kubicki
处理错误的最佳方法是什么? - Hunt

23
在提出的建议中,zip()实际上将可观察结果彼此组合,这可能是或不是所需的,但问题中没有要求。问题只要求执行每个操作,逐个执行或并行执行(未指定,但链接的Bolts示例是关于并行执行的)。此外,当任何一个observable完成时,zip()将立即完成,因此它违反了要求。
对于Observable的并行执行,flatMap() 在其他答案中提供很好,但merge()会更加直接。请注意,如果你想要推迟退出直到所有observables完成,merge将在任何一个observable错误时退出,你应该看看mergeDelayError()
对于逐个执行,我认为应该使用Observable.concat()静态方法。它的javadoc如下所述:

concat(java.lang.Iterable> sequences) 将多个Observables按顺序连接成一个Observable

如果您不想要并行执行,那么下面这段话可能适合您的需求。

另外,如果您仅对任务完成感兴趣而不关心返回值,则应该考虑使用Completable而不是Observable

简而言之:如果需要单个任务的执行和完成事件的触发,我认为最适合使用Completable.concat()。对于并行执行,Completable.merge()或Completable.mergeDelayError()听起来是解决方案。前者在任何Completable出现错误时立即停止,后者会执行它们所有的任务,即使其中一个任务有错误,也只报告此错误。


3

使用 Kotlin

Observable.zip(obs1, obs2, BiFunction { t1 : Boolean, t2:Boolean ->

})

在函数的参数中设置类型非常重要,否则会导致编译错误。

最后一个参数类型随参数数量而变化: 当参数为2个时使用BiFunction 当参数为3个时使用Function3 当参数为4个时使用Function4 ...


2

您可能已经了解到可以使用zip操作符处理两个可观察对象的情况。

此外,还有静态方法Observable.zip。它有一种形式对您应该很有用:

zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)

你可以查看javadoc了解更多内容。

1

我正在使用Kotlin和JavaRx Observables以及RxKotlin编写一些计算密集型代码。我想观察一个可观察对象列表,以便在完成时给我更新进度和最新结果。最后它会返回最佳的计算结果。额外的要求是并行运行Observables以利用所有CPU核心。我最终采用了以下解决方案:

@Volatile var results: MutableList<CalculationResult> = mutableListOf()

fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> {

    return Observable.create { subscriber ->
        Observable.concatEager(listOfCalculations.map { calculation: Calculation ->
            doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result
        }).subscribeBy(
            onNext = {
                results.add(it)
                subscriber.onNext(Pair("A calculation is ready", it))

            },
            onComplete = {
                subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results)) 
                subscriber.onComplete()
            },
            onError = {
                subscriber.onError(it)
            }
        )
    }
}

不熟悉RxKotlin或@Volatile,但如果同时被多个线程调用,这将如何工作?结果会发生什么? - eis

1
我曾遇到类似的问题,需要从rest调用中获取搜索项,并将保存的建议从RecentSearchProvider.AUTHORITY集成到一起,形成一个统一的列表。我尝试使用@MyDogTom的解决方案,但很遗憾RxJava中没有Observable.from方法。经过一番研究,我找到了适合我的解决方案。
 fun getSearchedResultsSuggestions(context : Context, query : String) : Single<ArrayList<ArrayList<SearchItem>>>
{
    val fetchedItems = ArrayList<Observable<ArrayList<SearchItem>>>(0)
    fetchedItems.add(fetchSearchSuggestions(context,query).toObservable())
    fetchedItems.add(getSearchResults(query).toObservable())

    return Observable.fromArray(fetchedItems)
        .flatMapIterable { data->data }
        .flatMap {task -> task.observeOn(Schedulers.io())}
        .toList()
        .map { ArrayList(it) }
}

我从包含建议和结果列表的可观察数组中创建了一个可观察对象,具体取决于查询。然后,您只需使用flatMapIterable遍历这些任务,并使用flatmap运行它们,将结果放入数组中,稍后可以将其提取到循环视图中。

0
如果您使用Project Reactor,您可以使用。
Mono.when(publisher1, publisher2)
.map(i-> {
    System.out.println("everything is done!");
    return i;
}).block()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接