Java 8流和RxJava可观察对象的区别

160

Java 8的流(stream)和RxJava的可观察对象(observables)是否相似?

Java 8的流定义:

新的java.util.stream包中的类提供了一个流API,支持对元素流进行函数式操作。


8
FYI,有提议在JDK 9中引入更多类似于RxJava的类。http://jsr166-concurrency.10961.n7.nabble.com/jdk9-Candidate-classes-Flow-and-SubmissionPublisher-td11967.html - John Vint
@JohnVint 这个提案的进展如何?它是否真的会实施? - IgorGanapolsky
2
@IgorGanapolsky 是的,它肯定看起来会进入 jdk9。http://cr.openjdk.java.net/~martin/webrevs/openjdk9/jsr166-jdk9-integration/。甚至还有一个 RxJava 到 Flow 的端口 https://github.com/akarnokd/RxJavaUtilConcurrentFlow。 - John Vint
我知道这是一个非常老的问题,但最近我参加了Venkat Subramaniam的一次精彩演讲,他对这个主题有深刻的见解,并且已经更新到Java9:https://www.youtube.com/watch?v=kfSSKM9y_0E。对于深入研究RxJava的人来说可能很有趣。 - Pedro
7个回答

168

简短回答

所有序列/流处理库都提供非常相似的API用于构建管道。差异在于处理多线程和管道组合的API。

详细回答

RxJava与Stream有很大的不同。在所有JDK中,最接近rx.Observable的可能是java.util.stream.CollectorStream + CompletableFuture组合(这会增加额外的单子层处理成本,即必须处理Stream<CompletableFuture<T>>CompletableFuture<Stream<T>>之间的转换)。

Observable和Stream之间存在重大差异:

  • Streams是基于pull的,Observables是基于push的。这听起来可能太抽象了,但它有非常具体的重要影响。
  • Stream只能使用一次,Observable可以订阅多次。
  • Stream#parallel()将序列分成分区,Observable#subscribeOn()Observable#observeOn()不会;使用Observable模拟Stream#parallel()行为很棘手,曾经有.parallel()方法,但该方法引起了很多混乱,因此.parallel()支持被移动到单独的存储库中:ReactiveX/RxJavaParallel: Experimental Parallel Extensions for RxJava。更多细节请参见另一个答案
  • Stream#parallel()不允许指定要使用的线程池,与大多数接受可选Scheduler的RxJava方法不同。由于JVM中所有流实例都使用相同的fork-join池,添加.parallel()可能会意外地影响程序的另一个模块的行为。
  • Streams缺少像Observable#interval()Observable#window()等与时间相关的操作;这主要是因为Streams是基于pull的,上游没有控制何时向下游发出下一个元素。
  • 与RxJava相比,Streams提供了受限的操作集。例如,Streams缺少截断操作(takeWhile()takeUntil());使用Stream#anyMatch()的解决方法是有限制的:它是终端操作,因此每个流只能使用一次。
  • 在JDK 8中,没有Stream#zip()操作,这有时非常有用。
  • Streams很难自己构建,Observable可以通过许多方式构建 编辑:如评论所述,有构建Stream的方法。但由于没有非终端短路,例如很难生成文件中行的Stream(JDK提供了Files#lines()BufferedReader#lines()),其他类似的情况可以通过从Iterator构建Stream来处理。
  • Observable提供资源管理功能(Observable#using());您可以将IO流或互斥锁包装在其中,并确保用户不会忘记释放资源 - 它将在订阅终止时自动释放;Stream具有onClose(Runnable)方法,但必须手动调用或通过try-with-resources调用。例如,您必须记住Files#lines()必须被包含在try-with-resources块中。
  • Observables在整个过程中都是同步的(实际上我没有检查Streams是否也是如此)。这使您无需考虑基本操作是否线程安全(答案始终为“是”,除非存在错误),但与并发相关的开销将存在,无论您的代码是否需要它。

总结

RxJava与流(Streams)有很大的不同。真正的RxJava替代品是其他ReactiveStreams的实现,例如Akka的相关部分。

更新

有一个技巧可以使用非默认的fork-join池(Stream#parallel),请参见Java 8并行流中的自定义线程池

更新

以上所有内容都基于对RxJava 1.x的经验。现在RxJava 2.x已经发布, 这个答案可能已经过时了。


2
为什么流难以构建?根据这篇文章,似乎很容易:http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html - IgorGanapolsky
2
有相当数量的类具有“stream”方法:集合、输入流、目录文件等。但是,如果您想从自定义循环(例如,在数据库游标上迭代)中创建流怎么办?到目前为止,我发现最好的方法是创建一个Iterator,用Spliterator包装它,最后调用StreamSupport#fromSpliterator。对于简单的情况来说,这是太多的粘合剂了。还有Stream.iterate,但它会产生无限流。在这种情况下切断流的唯一方法是Stream#anyMatch,但它是一个终端操作,因此您无法将流生产者和消费者分开。 - Kirill Gamazkov
2
RxJava有Observable.fromCallable、Observable.create等方法。或者你可以安全地生成无限的Observable,然后使用'.takeWhile(condition)'方法,这样就可以将这个序列发送给消费者了。 - Kirill Gamazkov
1
流是不难自己构建的。您可以简单地调用 Stream.generate() 并传递自己的 Supplier<U> 实现,只需提供下一个流中的项目的一个简单方法即可。还有很多其他方法。要轻松构建依赖于先前值的序列 Stream,可以使用 iterate() 方法,每个 Collection 都有一个 stream() 方法,而 Stream.of() 则从 varargs 或数组构造一个 Stream。最后,StreamSupport 支持使用分裂器或流原始类型进行更高级的流创建。 - jbx
1
流缺少截断操作(takeWhile()takeUntil()); JDK9 在 takeWhile()dropWhile() 中提供了这些功能。 - Honinbo Shusaku
显示剩余4条评论

50

Java 8 Stream 和 RxJava 看起来非常相似。它们有类似的操作符(filter、map、flatMap...),但不是为了相同的用途而构建。

你可以使用 RxJava 执行异步任务。

使用 Java 8 Stream,您将遍历集合的项。

在 RxJava 中,您也可以做类似的事情(遍历集合的项),但是由于 RxJava 专注于并发任务,它使用同步、门闩等机制,因此使用 RxJava 进行相同的任务可能比使用 Java 8 Stream 更慢。

RxJava 可以与 CompletableFuture 相比较,但能够计算多个值。


12
值得注意的是,你所说的流遍历只适用于非并行流。parallelStream 支持类似的简单遍历、映射、过滤等同步操作。 - John Vint
2
我不认为“因此,使用RxJava执行相同任务可能比使用Java 8 stream更慢。”是普遍适用的,它严重取决于手头的任务。 - daschl
1
我很高兴你说“使用RxJava完成相同任务可能比使用Java 8流慢”。这是一个非常重要的区别,许多RxJava用户并不知道。 - IgorGanapolsky
6
@marcin-koziński,您可以查看此基准测试:https://twitter.com/akarnokd/status/752465265091309568 - dwursteisen
1
希望能有一个使用案例列表,为每个推荐使用J8或Rx流。 - Stephane
显示剩余2条评论

40

存在一些技术和概念上的差异,例如Java 8流是单次使用的、基于拉取的同步值序列,而RxJava Observables是可重新观察的、自适应推-拉基础,并且潜在地异步的值序列。RxJava 针对 Java6+ 并可在 Android 上运行。


4
典型的涉及RxJava的代码会大量使用Lambda表达式,这些表达式仅在Java 8及以上版本中才可用。因此,您可以在Java 6中使用Rx,但代码会变得冗长。 - Kirill Gamazkov
1
类似的区别是,Rx Observables 可以一直保持活力,直到取消订阅。Java 8 流默认情况下通过操作终止。 - IgorGanapolsky
2
@KirillGamazkov,当你的目标是Java 6时,你可以使用retrolambda来使你的代码更加优美。 - Marcin Koziński
1
Kotlin 看起来甚至比 Retrofit 更性感。 - Kirill Gamazkov

31

Java 8的Streams是基于拉取(pull)的。您迭代遍历Java 8流,逐个消耗每个项目。而且它可能是无限的流。

RXJava的Observable默认是基于推送(push)的。您订阅一个Observable,当下一个项目到达时(onNext),或者当流完成时(onCompleted)或出现错误时(onError),您将收到通知。 由于使用Observable您会接收到onNextonCompletedonError事件,所以可以进行一些强大的功能,例如将不同的Observable组合成一个新的(zipmergeconcat)。您还可以进行缓存、节流等操作。此外,它在不同的语言中使用的API基本相同(RxJava、C#中的RX、RxJS等)。

默认情况下,RxJava是单线程的。除非您开始使用Schedulers,否则所有操作都将在同一线程上执行。


在 Stream 中,你有 forEach,它几乎与 onNext 相同。 - paul
实际上,流通常是终端的。 "关闭流管道的操作称为终端操作。它们从管道中产生结果,例如列表、整数或甚至是 void(任何非流类型)"。~http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html - IgorGanapolsky

30
现有的答案全面且正确,但缺少初学者的清晰示例。让我为像“推/拉式”和“可重复使用”的术语提供一些具体的内容。注意:我讨厌术语“Observable”(它是一个流),所以将简单地称其为J8与RX流。
考虑一个整数列表,
digits = [1,2,3,4,5]

一个 J8 Stream 是一个修改集合的实用工具。例如,可以提取偶数数字,如下所示:
evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())

这基本上就是Python的map, filter, reduce,是Java非常好的(也是迫在眉睫的)补充。但是,如果数字没有提前收集,而是在应用程序运行时流入 - 我们能实时过滤偶数吗?
想象一下,在应用程序运行时,一个单独的线程进程在随机时间输出整数(---表示时间)。
digits = 12345---6------7--8--9-10--------11--12

在RX中,even可以对每个新数字做出反应,并实时应用过滤器。
even = -2-4-----6---------8----10------------12

不需要存储输入和输出列表。如果您想要一个输出列表,也没有问题,它也可以流式传输。事实上,一切都是流。

evens_stored = even.collect()  

这就是为什么“无状态”和“函数式”等术语更多地与RX相关联的原因。

但是5不是偶数...而且看起来J8 Stream是同步的,而Rx Stream是异步的? - Franklin Yu
1
@FranklinYu 谢谢,我已经修复了5个错别字。如果从命令式和函数式的角度来看,而不是同步和异步,虽然可能是正确的,但更重要的是。在J8中,您首先收集所有项目,然后第二个应用过滤器。在RX中,您独立于数据定义过滤器函数,然后将其与一个事件源(实时流或Java集合)相关联...这是完全不同的编程模型。 - Adam Hughes

4
RxJava也与响应式流计划密切相关,并将自己视为响应式流API的简单实现(例如与Akka流实现相比)。主要区别在于,响应式流被设计为能够处理反压力,但如果您查看响应式流页面,您会有所了解。他们非常好地描述了自己的目标,流也与响应式宣言密切相关。
Java 8流几乎是无限集合的实现,与Scala StreamClojure lazy seq非常相似。

2

Java 8流使处理大型集合变得高效,并利用多核体系结构。与之相反,RxJava默认情况下是单线程的(没有Schedulers)。因此,除非您自己编写逻辑,RxJava否则无法利用多核计算机。


4
默认情况下,Stream 是单线程的,除非您调用 .parallel() 方法。而 Rx 可以更好地控制并发。 - Kirill Gamazkov
@KirillGamazkov Kotlin协程流(基于Java8 Streams)现在支持结构化并发:https://kotlinlang.org/docs/reference/coroutines/flow.html#flows - IgorGanapolsky
真的,但我没有提到Flow和结构化并发。我的两个观点是:1)除非您明确更改,否则Stream和Rx都是单线程的;2)与流仅允许您说“以某种方式并行化”不同,Rx使您可以对要在哪个线程池上执行哪个步骤进行细粒度控制。 - Kirill Gamazkov
我真的不太明白“为什么需要线程池”的问题。就像你所说的,“为了高效地处理大型集合”。或者说,我想让任务中的IO绑定部分在单独的线程池上运行。我不认为我理解你问题背后的意图。你能再试一下吗? - Kirill Gamazkov
1
Schedulers类中的静态方法允许获取预定义的线程池,也可以从Executor创建一个线程池。请参阅http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/schedulers/Schedulers.html#from-java.util.concurrent.Executor-。 - Kirill Gamazkov
重点是利用多核,而不是多线程池。 - IgorGanapolsky

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接