RxJava 合并 Flowable 和 Completable

4
我有一个不断发出项目但从不调用onErroronCompleteFlowable。现在我有一个Completable,我想将其与此Flowable合并,以便当Completable完成时,Flowable会调用onComplete。我无法直接更改给我的Flowable对象。
其中一个问题是,我本来会在Flowable上使用takeUntil,但是Flowable可能会在任何时间停止发出项目,而我仍希望Completable能够调用onComplete
更新: 由于我们可以使用Completable.toFlowable()合并两个Flowables。问题是,我仍然找不到一种同时完成两者的方法。
2个回答

2

正如您所指出的,您无法更改原始Flowable本身,因此它不会发出onComplete。 但是,您可以通过以下方式使结果Flowable发出它(伪代码):

val f: Flowable = ...
val c: Completable = ...
val r: Flowable = f.materialize().mergeWith(c.materialize()).dematerialize()


这并不能实现我的功能,因为mergeWith(Completable)需要Flowable也完成。 - HaydenKai
你试过了吗?它不需要流式布局就能完成。 - Maxim Volgin
我已经尝试过了,mergeWith 的文档说“只有当另一个 CompletableSource 也完成时才会完成”。 - HaydenKai
这不应该有影响,因为我们从Completable中去除了onComplete的物化。无论如何,您可以尝试使用静态的.merge()方法。 - Maxim Volgin
.merge() 等待两个源都完成 - HaydenKai

1
这里有一个通用解决方案。正如@Maxim的回答中所指出的,merge/mergeWith不能实现我的功能,因为它需要FlowableCompletable都完成。这个解决方案还可以正确处理两者的释放。
    Flowable<Integer> f = Flowable.fromArray(1, 2, 3);
    Completable c = Completable.complete();

    final PublishSubject<Integer> subject = PublishSubject.create();
    final CompositeDisposable cd = new CompositeDisposable();
    Flowable result = subject
            .doOnSubscribe(__ -> {
                cd.add(f.subscribe(subject::onNext));
                cd.add(c.subscribe(subject::onComplete));
            })
            .doOnDispose(cd::dispose)
            .toFlowable(BackpressureStrategy.LATEST);

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接