RxJava2 subscribe 在一段时间后停止观察,但在Flowable完成时继续观察。

4

我很难理解下面代码示例的行为:

    Flowable<String> f = Flowable.just(1)
            .flatMap(it -> Flowable.create(e -> {

                for(int i = 1; i < 1001; ++i) {
                    log.info("Emitting: " + i);
                    if(i % 10 == 0) {
                        Thread.sleep(1000);
                    }
                    e.onNext(i);
                }

                e.onComplete();
            }, BackpressureStrategy.BUFFER))
            .map(String::valueOf)
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.newThread());

    f.subscribe(val -> {
        Thread.sleep(100);
        log.info("Observing: " + val);
    });

    Thread.sleep(1000000);
subscribe调用观察到128个项目时,代码可以正常工作。发射和观察是并行的。但在此之后,Flowable继续发射项目(显然在某个地方排队),但在发射了所有1000个项目之前,没有观察到任何项目。发射完所有1000个项目后,剩余的项目(> 128)将一次性被观察到。
这似乎与后压缓冲区大小为128有关,但我仍然期望发射和观察在整个1000个项目中都是并行的,因为观察者显然不比发射器慢。这里有什么我漏掉的东西吗?我该怎么修改代码呢?
1个回答

5
这是由于创建和subscribeOn之间存在同一池死锁的问题:
如果链中有一个create(FlowableOnSubscribe, BackpressureStrategy)类型的源,建议使用subscribeOn(scheduler, false),以避免同一池死锁,因为请求可能会在急切/阻塞的发射器后面积累。
//...
.subscribeOn(Schedulers.io(), false)
//...

编辑:

我曾尝试用Flowable.range替换Flowable.create来测试原始示例(还使用了你建议的修复方法),但我并没有遇到问题。你能否举个例子说明在什么情况下可能会出现问题?

Flowable.range(1, 10)
    .subscribeOn(Schedulers.io(), false)
    .doOnNext(v -> System.out.println(Thread.currentThread().getName()))
    .observeOn(Schedulers.single(), false, 1)
    .blockingSubscribe();

最初会打印RxCachedThreadScheduler-1,然后打印RxSingleScheduler-1 9次,因为observeOn的补给请求会在单个调度程序上运行,而不是被路由回io调度程序。请尝试将subscribeOn设置为true。


谢谢。也许这应该是默认行为?对我来说,很难追踪到这个问题所在... - Ertan D.
不行,因为这只影响到 Flowable.create,并且会干扰其他源,因为对于它们来说,subscribeOn 必须将发射固定到特定的调度程序。使用命令式的 create,事实上你正在“阻塞”发射线程,因此没有必要也没有能力回到由下游请求触发的那个线程。 - akarnokd
所以,我的理解是subscribeOn(Schedulers.io(), false)可能会与其他流产生问题,对吗? 我有一个共同的地方,在那里我有这个subscribeOn调用,并且我的来源可能不同,有些是Flowable.create,有些则不是。我尝试了原始示例(加上您建议的修复),将Flowable.create替换为Flowable.range ,但我没有遇到任何问题。您能否举个例子说明什么情况下可能会出现问题? - Ertan D.
谢谢你的示例。然而,当我在doOnNext之前使用observeOn时,无论subscribeOn是true还是false,它都会以相同的方式工作。在我的使用案例中,我总是先调用subscribeOn,然后立即调用observeOn。因此,对于我的使用情况,你的修复看起来不错,希望不会对没有Flowable.create的源代码造成任何问题。 - Ertan D.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接