最近我意识到自己不理解RxJava2
的背压(backpressure)是如何工作的。
我进行了一项小测试,预计它应该会出现MissingBackpressureException
异常:
@Test
public void testBackpressureWillFail() {
Observable.<Integer>create(e -> {
for (int i = 0; i < 10000; i++) {
System.out.println("Emit: " + i);
e.onNext(i);
}
e.onComplete();
})
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.doOnNext(i -> {
Thread.sleep(100);
System.out.println("Processed:" + i);
})
.blockingSubscribe();
}
系统输出如下:
Emit: 0
Emit: 1
Emit: 2
...
Emit: 10000
Processed:0
Processed:1
Processed:2
...
Processed:10000
为什么不会产生MissingBackpressureException?
我期望e.onNext(i);将项目放入ObservableObserveOn的缓冲区中,当其大小大于static final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());时,应该抛出MissingBackpressureException。但是这并没有发生。缓冲区是否会自动增长?如果不会,那么项目存储在哪里?