Java 8的流(stream)和RxJava的可观察对象(observables)是否相似?
Java 8的流定义:
新的
java.util.stream
包中的类提供了一个流API,支持对元素流进行函数式操作。
Java 8的流(stream)和RxJava的可观察对象(observables)是否相似?
Java 8的流定义:
新的
java.util.stream
包中的类提供了一个流API,支持对元素流进行函数式操作。
所有序列/流处理库都提供非常相似的API用于构建管道。差异在于处理多线程和管道组合的API。
RxJava与Stream有很大的不同。在所有JDK中,最接近rx.Observable
的可能是java.util.stream.Collector
Stream
+ CompletableFuture
组合(这会增加额外的单子层处理成本,即必须处理Stream<CompletableFuture<T>>
和CompletableFuture<Stream<T>>
之间的转换)。
Observable和Stream之间存在重大差异:
Stream#parallel()
将序列分成分区,Observable#subscribeOn()
和Observable#observeOn()
不会;使用Observable模拟Stream#parallel()
行为很棘手,曾经有.parallel()
方法,但该方法引起了很多混乱,因此.parallel()
支持被移动到单独的存储库中:ReactiveX/RxJavaParallel: Experimental Parallel Extensions for RxJava。更多细节请参见另一个答案。Stream#parallel()
不允许指定要使用的线程池,与大多数接受可选Scheduler的RxJava方法不同。由于JVM中所有流实例都使用相同的fork-join池,添加.parallel()
可能会意外地影响程序的另一个模块的行为。Observable#interval()
、Observable#window()
等与时间相关的操作;这主要是因为Streams是基于pull的,上游没有控制何时向下游发出下一个元素。takeWhile()
、takeUntil()
);使用Stream#anyMatch()
的解决方法是有限制的:它是终端操作,因此每个流只能使用一次。Stream#zip()
操作,这有时非常有用。Files#lines()
和BufferedReader#lines()
),其他类似的情况可以通过从Iterator构建Stream来处理。Observable#using()
);您可以将IO流或互斥锁包装在其中,并确保用户不会忘记释放资源 - 它将在订阅终止时自动释放;Stream具有onClose(Runnable)
方法,但必须手动调用或通过try-with-resources调用。例如,您必须记住Files#lines()
必须被包含在try-with-resources块中。RxJava与流(Streams)有很大的不同。真正的RxJava替代品是其他ReactiveStreams的实现,例如Akka的相关部分。
有一个技巧可以使用非默认的fork-join池(Stream#parallel
),请参见Java 8并行流中的自定义线程池。
以上所有内容都基于对RxJava 1.x的经验。现在RxJava 2.x已经发布, 这个答案可能已经过时了。
Stream.generate()
并传递自己的 Supplier<U>
实现,只需提供下一个流中的项目的一个简单方法即可。还有很多其他方法。要轻松构建依赖于先前值的序列 Stream
,可以使用 iterate()
方法,每个 Collection
都有一个 stream()
方法,而 Stream.of()
则从 varargs 或数组构造一个 Stream
。最后,StreamSupport
支持使用分裂器或流原始类型进行更高级的流创建。 - jbxJava 8 Stream 和 RxJava 看起来非常相似。它们有类似的操作符(filter、map、flatMap...),但不是为了相同的用途而构建。
你可以使用 RxJava 执行异步任务。
使用 Java 8 Stream,您将遍历集合的项。
在 RxJava 中,您也可以做类似的事情(遍历集合的项),但是由于 RxJava 专注于并发任务,它使用同步、门闩等机制,因此使用 RxJava 进行相同的任务可能比使用 Java 8 Stream 更慢。
RxJava 可以与 CompletableFuture
相比较,但能够计算多个值。
parallelStream
支持类似的简单遍历、映射、过滤等同步操作。 - John Vint存在一些技术和概念上的差异,例如Java 8流是单次使用的、基于拉取的同步值序列,而RxJava Observables是可重新观察的、自适应推-拉基础,并且潜在地异步的值序列。RxJava 针对 Java6+ 并可在 Android 上运行。
Java 8的Streams是基于拉取(pull)的。您迭代遍历Java 8流,逐个消耗每个项目。而且它可能是无限的流。
RXJava的Observable默认是基于推送(push)的。您订阅一个Observable,当下一个项目到达时(onNext
),或者当流完成时(onCompleted
)或出现错误时(onError
),您将收到通知。
由于使用Observable
您会接收到onNext
、onCompleted
和onError
事件,所以可以进行一些强大的功能,例如将不同的Observable
组合成一个新的(zip
、merge
、concat
)。您还可以进行缓存、节流等操作。此外,它在不同的语言中使用的API基本相同(RxJava、C#中的RX、RxJS等)。
默认情况下,RxJava是单线程的。除非您开始使用Schedulers,否则所有操作都将在同一线程上执行。
digits = [1,2,3,4,5]
evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())
---
表示时间)。digits = 12345---6------7--8--9-10--------11--12
even
可以对每个新数字做出反应,并实时应用过滤器。even = -2-4-----6---------8----10------------12
不需要存储输入和输出列表。如果您想要一个输出列表,也没有问题,它也可以流式传输。事实上,一切都是流。
evens_stored = even.collect()
Java 8流使处理大型集合变得高效,并利用多核体系结构。与之相反,RxJava默认情况下是单线程的(没有Schedulers)。因此,除非您自己编写逻辑,RxJava否则无法利用多核计算机。