Observable和Flowable在rxJava2中的区别

151

我一直在研究新的RxJava2,并且我不太确定我是否还理解“背压”这个概念...

我知道我们有Observable不支持backpressure,而Flowable则支持。

因此,基于示例,假设我有一个带有intervalFlowable

        Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

这段代码在大约128个值后就会崩溃,很明显我消耗的速度比获取项目的速度慢。

但是我们在Observable中也有同样的问题。

     Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

即使我在消费时添加一些延迟,它也不会崩溃。为了让 Flowable 工作,我使用了 onBackpressureDrop 操作符,这样就可以避免崩溃,但是并没有发射所有的值。

因此,我目前无法在脑海中找到答案的基本问题是,当我可以使用普通的 Observable 接收所有值而不管理缓冲区时,为什么我要关心 backpressure?或者从另一方面来说,backpressure 在管理和处理消费方面给我带来了哪些优势?


2
使用哪种类型?Observable 还是 Flowable? - Eido95
3个回答

137

在实践中,背压表现为有界缓冲区,Flowable.observeOn有一个128个元素的缓冲区,被尽可能快地排空。您可以单独增加此缓冲区大小以处理突发源,并且所有的背压管理实践仍适用于1.x。Observable.observeOn具有无界缓冲区,会持续收集元素,可能导致应用程序耗尽内存。

例如,您可以使用Observable

  • 处理GUI事件
  • 使用短序列(总共少于1000个元素)

例如,您可以使用Flowable

  • 冷源和非定时源
  • 生成器样式的源
  • 网络和数据库访问器

由于这个问题在另一个问题中已经出现 - 当语义上适当时,更受限制的类型(例如MaybeSingleCompletable)是否总是可以替代Flowable - david.mihola
2
是的,MaybeSingleCompletable这些类型太小了,根本不需要背压概念。生产者不可能比消费者更快地发出项目,因为最多只会产生或消耗0-1个项目。 - AndrewF
也许我不对,但对我来说,Flowable和Observable的示例应该交换。 - Yura Galavay
我认为在这个问题中,他缺少了我们需要提供给Flowable的背压策略,这解释了为什么会抛出缺少背压异常,也解释了为什么在应用.onBackpressureDrop()之后此异常消失。对于Observable而言,由于它没有这个策略并且无法提供一个,所以它最终会因OOM而失败。 - Haomin

135

当你的可观察对象(发布者)创建的事件比您的订阅者能处理的还多时,就会出现背压。这样,您可能会遗漏事件,或者您可能会得到一个巨大的事件队列,最终导致内存不足。 Flowable 考虑了背压问题。而Observable没有考虑。就是这样。

这使我想起漏斗,当其中液体过多时会溢出。 Flowable 可以帮助避免这种情况:

有很大的背压:

enter image description here

但是使用 Flowable 后,背压就明显减少了:

enter image description here

Rxjava2 有一些背压策略,根据您的用例可以使用它们。 策略是指 Rxjava2 提供一种处理因溢出(背压)而无法处理的对象的方法。

这里是策略。 我不会介绍所有策略,但例如,如果您不想担心溢出的项目,可以使用如下的丢弃策略:

observable.toFlowable(BackpressureStrategy.DROP)

据我所知,队列应该有 128 个项目的限制,超过这个限制就会发生溢出(背压)。即使不是 128,也接近该数字。希望这对某人有所帮助。

如果需要将缓冲区大小从 128 更改为其他值,似乎可以像这样进行操作(但请注意任何内存限制):

myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.  

在软件开发中,“back pressure”策略通常意味着你告诉发射器稍微放慢一些速度,因为消费者无法处理你正在发射的事件。


我一直认为反压是一组机制的名称,这些机制可以让消费者通知生产者减慢速度... - kboom
可能是这样。是的。 - j2emanue
1
使用Flowable有什么缺点吗? - IgorGanapolsky
2
这些图像在欺骗我。丢弃事件并不会让底部出现“更多的钱”。 - EpicPandaForce
1
@j2emanue,你把操作符的缓冲区大小和Flowable.buffer(int)操作符搞混了。请仔细阅读JavaDocs并相应地修正你的答案: http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html - tomek

16
在处理流(Flowable)时,如果没有使用背压机制,它在发出 128 个值后崩溃,并不意味着每次都会在发出 128 个值后崩溃:有时它可能在发出 10 个值后就崩溃了,而有时则可能根本不会崩溃。我猜当您使用 Observable 示例时,可能是因为没有背压概念的存在,所以代码可以正常工作,但下一次就可能不行了。 RxJava 2 中的区别在于,Observable 不再支持背压概念,也没有处理方式。如果您正在设计一个可能需要显式背压处理的反应式序列,则 Flowable 是您的最佳选择。请注意保留原 HTML 标签。

是的,我观察到有时它在较少的值之后就会中断,有时则不会。但是,如果例如我只处理“间隔”而没有“背压”,那么我是否应该期望一些奇怪的行为或问题? - user2141889
如果你确信在特定的Observable序列中不会出现背压问题,那么我想忽略背压也是可以的。 - Egor

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接