如何在RxJava2中链接两个Completable

84

我有两个 Completable 。我想按照以下场景进行操作: 如果第一个 Completable 到达 onComplete ,则继续使用第二个 Completable。最终结果将是第二个 Completable 的 onComplete。

当我拥有 Single getUserIdAlreadySavedInDevice() 和 Completable login() 时,我是这样做的:

@Override
public Completable loginUserThatIsAlreadySavedInDevice(String password) {
    return getUserIdAlreadySavedInDevice()
            .flatMapCompletable(s -> login(password, s))

}
5个回答

140

你正在寻找andThen操作符。

返回一个Completable,先执行这个Completable,然后再执行另一个Completable

firstCompletable
    .andThen(secondCompletable)

一般来说,这个操作符是 Completable 上的 flatMap 的“替代品”:

Completable       andThen(CompletableSource next)
<T> Maybe<T>      andThen(MaybeSource<T> next)
<T> Observable<T> andThen(ObservableSource<T> next)
<T> Flowable<T>   andThen(Publisher<T> next)
<T> Single<T>     andThen(SingleSource<T> next)

3
这应该被标记为正确的答案。我遇到了与问题发起人相同的情况,并且已经搜索了2天的答案。你的答案@Maxim Ostrovidov就是我一直在找的。谢谢! - chip
2
文档没有说明在第一个可完成项出错的情况下是否执行第二个可完成项。有人知道吗? - BoD
2
@BoD 如果流中发生任何错误,它将通过onError事件终止。这是反应式流的一般行为。因此,回答您的问题,第二个Completable不会被执行。 - Maksim Ostrovidov
11
我认为 andThen 并不能替代 flatMapCompletable,因为 andThen 接受一个 Completable 实例,而 flatMapCompletable 接受一个创建 Completable 的函数。前者是急切的,在调用时创建 Completable,而后者是惰性的,Completable 完成后才会被构建。对于急切版本,你可能在前一个 Completable 完成之前就开始构造 Completable,对吧? - Galder Zamarreño
1
Zamanneno是正确的。你必须用Single.fromCallable(() -> eagerCompletable).flatMapCompletable(c -> c)来包装任何急切的东西。这刚刚咬了我一口。 - Sparky
显示剩余3条评论

135
TL;DR: 其他答案忽略了一个细节。如果需要concat相当的效果,请使用doThingA().andThen(doThingB()),如果需要flatMap相当的效果,请使用doThingA().andThen(Completable.defer(() -> doThingB())

编辑:更完整的参考

  • flatMap()merge()的映射版本
  • concatMap()concat()的映射版本
  • 对于Completable,您需要使用defer()使函数调用惰性执行,就像在SingleObservable的映射函数中一样(或者最好是直到订阅时才发生任何事情-这是一个很好的约定,并且在官方的Rx库以及我遇到的任何Rx扩展中都被使用,对于高级用户,这仅适用于冷completable,但大多数人可以忽略这一点)。
  • concat(a, b)a.andThen(b)之间唯一的区别是语法

一些示例:

  • foo(a).andThen(bar(b))将:

    1. 调用foo(a)
    2. 即使步骤1返回错误,也会立即调用bar(b)
    3. 订阅步骤1返回的任何内容
    4. 如果最后一步成功完成,那么将会订阅bar(b)的结果,并且仅在这种情况下
  • foo(a).andThen(Completable.defer(() -> bar(b))将会:

    1. 调用foo(a)
    2. 订阅第1步的结果
    3. 仅当由foo(a)返回的completable成功后才调用bar(b)

我会省略对merge()的处理,因为它变得更加复杂,但简而言之,如果您想要"并行性",那么就需要调用那个方法。


上述答案有点正确,但我认为它们具有误导性,因为它们忽略了关于急切求值的微妙之处。

doThingA().andThen(doThingB())将立即调用doThingB(),但仅在由doThingA()返回的observable完成时才订阅到doThingB()返回的observable。

doThingA().andThen(Completable.defer(() -> doThingB())只会在thing A完成后才会调用doThingB()

这只在doThingB()在订阅事件之前具有副作用时才重要。例如:Single.just(sideEffect(1)).toCompletable()

一个在订阅事件之前没有副作用的实现(真正的冷可观察对象)可能是 Single.just(1).doOnSuccess(i -> sideEffect(i)).toCompletable()

在我遇到的情况中,A 是一些验证逻辑,而 doThingB() 立即启动异步数据库更新,以完成 VertX ObservableFuture。这很糟糕。可以说,doThingB() 应该被编写为只在订阅时更新数据库,我将来会尝试以这种方式设计事物。


但我同意,如果您的意思是这样的话,重构doThingB()会更好。 - Sparky
相信这段代码:Completable.fromAction(() -> System.out.println("foo")) .andThen(Completable.error(new RuntimeException("bam!"))) .andThen(Completable.fromAction(() -> System.out.println("Hello World"))).await(); - Sparky
defer()fromAction()的二阶等效函数,就像flatMap()map()的二阶等效函数一样,因此它们在这里是等效的。 - Sparky
@Sparky,这是错误的:“即使步骤1返回一个错误,也立即调用bar(b)”。尝试这个:Completable.error(new RuntimeException("Boom")).andThen((CompletableObserver o) -> System.out.println("Never happens")).subscribe(); - igobivo
@igobivo 不是,假设您想要的例子是 foo(a).andThen(bar(b)),则您误读了它,这是标准函数优先级,而不是任何响应式的内容。您想的是如果它是 foo(a).andThen((CompletableObserver o) -> bar(b)) 时会发生什么。问题出在人们犯了初学者的错误,并将 bar(b) 定义为类似于 Completable bar(String b) { if(someDatabaseOperation(b)) { return Completable.complete(); } else { return Completable.error(); } } 的东西。我至少在野外见过这种情况两次。 - Sparky
显示剩余13条评论

5
请注意,这里得票更多的答案有点误导。请看下面的例子,我的想法是展示一些测试场景,并展示带有andThen操作符的完整逻辑的行为。
 private fun doThingAFail(): Completable {
        print("Do thingA Called\n")
        return Completable.fromCallable {
            print("calling stream A\n")
            throw(Exception("The excep"))
        }
    }

    private fun doThingB(): Completable {
        print("Do thingB Called\n")
        return Completable.fromCallable {
            print("calling stream B\n")

        }
    }

    private fun doThingA(): Completable {
        print("Do thingA Called\n")
        return Completable.fromCallable {
            print("calling stream A\n")
        }
    }

请注意以下测试:

@Test
fun testCallAPlusB() {
    doThingA().andThen(doThingB())
}

输出结果将是:

Do thingA Called
Do thingB Called

这里有一个小提示:请注意,我们在这个代码片段中没有订阅这些 Completable。

测试内容:

@Test
fun theTestSubscribe() {
    doThingA().andThen(doThingB()).subscribe()
}

输出结果应该是:
Do thingA Called
Do thingB Called
calling stream A
calling stream B


最后,如果第一个可完成项失败,第二个可完成项将不会被执行。
@Test
fun theTestFailThingA() {
    doThingAFail().andThen(doThingB()).subscribe()
}

输出结果将会是:
Do thingA Called
Do thingB Called
calling stream A

这里的关键概念是方法内部和observable内部的逻辑不同时执行。 当我们调用 doThingA()doThingB() 方法时,"Do thingA Called" 和 "Do thingB Called" 行将被打印。而 "calling stream A" 和 "calling stream B" 行仅在有人订阅doThingAdoThingB 方法时才会被调用。
第二个概念是 andThen 操作符如何处理错误。在上面的例子中,如果 doThingA() completable 出现错误,将终止流,并且不会打印 "calling stream B" 行。

5

Concat会导致与上述相同的问题。这将立即得到解决。因此,如果您真的依赖于您的调用之前,请坚持使用doThingA().andThen(Completable.defer(() -> doThingB())。 - Highriser

0

我遇到了同样的问题,但是我使用了 .concactWith 运算符来使它正常工作。 在我的情况下,我有两个类型为 Completable 的 fun。

fun makeTwoThings(): Completable {
    makeFirstThing().concatWith(makeSecondThing())
}

fun makeFirstThing(): Completable{
     //TODO()
}

fun makeSecondThing(): Completable{
     //TODO()
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接