我一直在研究新的RxJava2,并且我不太确定我是否还理解“背压”这个概念...
我知道我们有Observable
不支持backpressure
,而Flowable
则支持。
因此,基于示例,假设我有一个带有interval
的Flowable
:
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
这段代码在大约128个值后就会崩溃,很明显我消耗的速度比获取项目的速度慢。
但是我们在Observable
中也有同样的问题。
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
即使我在消费时添加一些延迟,它也不会崩溃。为了让 Flowable
工作,我使用了 onBackpressureDrop
操作符,这样就可以避免崩溃,但是并没有发射所有的值。
因此,我目前无法在脑海中找到答案的基本问题是,当我可以使用普通的 Observable
接收所有值而不管理缓冲区时,为什么我要关心 backpressure
?或者从另一方面来说,backpressure
在管理和处理消费方面给我带来了哪些优势?