RxJava2 可观察流背压机制

6

最近我意识到自己不理解RxJava2的背压(backpressure)是如何工作的。

我进行了一项小测试,预计它应该会出现MissingBackpressureException异常:

@Test
public void testBackpressureWillFail() {
    Observable.<Integer>create(e -> {
        for (int i = 0; i < 10000; i++) {
            System.out.println("Emit: " + i);
            e.onNext(i);
        }
        e.onComplete();
    })
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.computation())
    .doOnNext(i -> {
        Thread.sleep(100);
        System.out.println("Processed:" + i);
    })
    .blockingSubscribe();
}

系统输出如下:

Emit: 0
Emit: 1
Emit: 2
...
Emit: 10000

Processed:0
Processed:1
Processed:2
...
Processed:10000

为什么不会产生MissingBackpressureException?
我期望e.onNext(i);将项目放入ObservableObserveOn的缓冲区中,当其大小大于static final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());时,应该抛出MissingBackpressureException。但是这并没有发生。缓冲区是否会自动增长?如果不会,那么项目存储在哪里?

1
RxJava2中的Observable不支持背压,只有Flowable支持。 - Tassos Bassoukos
3
我知道它们不支持反压,但我认为不支持意味着会抛出MissingBackpressureException异常,而不是自动增加缓冲区。 - Rostyslav Roshak
1
“Observable doesn't support backpressure” 的意思是当源头发出的项目比消费者处理它们的速度更快时,这些项目会无限制地缓冲,直到由于资源短缺而抛出 OutOfMemory 异常。然而,对于 Flowable,只要缓冲区(有界)已满,就会抛出 MissingBackpressureException。 - HiddenDroid
1个回答

3
那是因为在RxJava2中,背压只传递给了Flowable,详情请参见此处。如果你切换到使用BackpressureStrategy.MISSINGFlowable,你将会得到异常。
这也意味着在你的情况下确实有一个自动增长的缓冲区,从observerOn文档中可以看出:

修改ObservableSource以在指定的Scheduler上异步执行其发射和通知,并使用无限制的缓冲区...


谢谢您,请问一下,如果缓冲区是无界的,在Observable中为什么需要public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize)这个方法呢?我是否正确地理解了它表示缓冲区增长的大小呢? - Rostyslav Roshak
然而,我也认为根据文档,这是“岛屿”的可配置大小,它们是缓冲区的增量步骤。 - yosriz
@RostyslavRoshak 实际上,这个 bufferSize 参数仅适用于缓冲区的增量步骤。Observable 缓冲区的大小默认始终设置为 128 个元素,并根据需要逐步增加。 - HiddenDroid

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接