CompletableFuture、Future和RxJava的Observable之间的区别是什么?

272
我想知道CompletableFuture、Future和Observable RxJava之间的区别。它们都是异步的,但是Future.get()会阻塞线程,CompletableFuture提供回调方法,而RxJava Observable类似于CompletableFuture并具有其他优点(不确定)。例如:如果客户端需要进行多个服务调用,并且使用Futures(Java)时,Future.get()将按顺序执行...想知道在RxJava中如何更好地实现。文档http://reactivex.io/intro.html表示:使用Futures难以最优地组合条件异步执行流(或不可能,因为每个请求的延迟在运行时会有所不同)。当然,这可以做到,但很快就会变得复杂(因此容易出错)或者过早地阻止Future.get(),这会消除异步执行的好处。

我非常想知道RxJava是如何解决这个问题的。但我发现文档中的内容很难理解。


你阅读了每个的文档吗?我对RxJava完全不熟悉,但是一眼看去文档似乎非常详尽。它似乎并不特别类似于这两个futures。 - FThompson
我已经阅读过了,但是无法理解它与Java futures有何不同...如果我错了,请纠正我。 - shiv455
我猜这两者都用于异步操作。 - shiv455
2
想知道它们的不同之处,例如线程管理方面是否不同?例如:Future.get() 阻塞线程...在 Observable 中如何处理? - shiv455
2
至少对我来说有点困惑...一个高层次的区别将非常有帮助!! - shiv455
显示剩余4条评论
5个回答

409

期货

Future是在Java 5(2004年)中引入的。它们基本上是尚未完成操作结果的占位符。一旦操作完成,Future将包含该结果。例如,操作可以是RunnableCallable实例,这些实例被提交给ExecutorService。操作的提交者可以使用Future对象来检查操作是否isDone(), 或使用阻塞的get()方法等待操作完成。

例子:

/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}

CompletableFutures

CompletableFutures 是在 Java 8(2014 年)中引入的。它们实际上是常规 Futures 的演变,受到 Google 的 Listenable FuturesGuava 库的启发。它们是 Futures,还允许您将任务串联在一起。您可以使用它们告诉某个工作线程“去执行某个任务 X,当完成后,使用 X 的结果执行另一个任务”。使用 CompletableFutures,您可以在不阻塞线程等待结果的情况下对操作结果进行处理。下面是一个简单的示例:

/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava

RxJava是Netflix创建的整个用于反应式编程的库。一眼看上去,它似乎与Java 8的流类似。实际上,它比后者更加强大。

与Futures类似,RxJava可以用于串联一系列同步或异步操作以创建处理管道。与只能单次使用的Futures不同,RxJava适用于零个或多个项的,包括具有无限数量项的永久流。由于拥有一个难以置信丰富的运算符集合,因此它也更加灵活和强大。

与Java 8的流不同,RxJava还具有背压机制,可以处理处理管道中不同部分在不同线程上以不同速率运行的情况。
RxJava的缺点是尽管有完整的文档,但由于涉及范式转变,它是一个具有挑战性的库。如果涉及多个线程,甚至更糟的是 - 如果需要背压,则Rx代码也可能难以调试。
如果您想了解更多信息,请访问官方网站上的页面,其中包含各种教程,以及官方文档Javadoc。您还可以观看一些视频,例如这个视频,它简要介绍了Rx并讨论了Rx和Futures之间的区别。

奖励:Java 9 响应式流

Java 9的响应式流,又称Flow API,是由各种响应式流库(如RxJava 2Akka StreamsVertx)实现的一组接口。它们允许这些响应式库相互连接,同时保留了重要的反压功能。


2
最好能够提供Rx如何实现此功能的示例代码。 - RomanKousta
1
@IgorGanapolsky 是的。 - Malt
1
在CompletableFuture中,我们使用回调方法,如果一个方法的输出是另一个回调的输入,则这些回调方法也会被阻塞。就像Future使用Future.get()调用一样,因此被称为阻塞调用。但是,为什么说CompletableFutures不会阻塞?请解释一下。 - Deepak
2
@Federico 当然可以。每个“Future”都是一个占位符,用于表示可能已经完成或尚未完成的单个结果。如果您再次执行相同的操作,则会获得一个新的“Future”实例。RxJava处理可以随时到达的结果流。因此,一系列操作可以返回一个单个的RxJava可观察对象,该对象将输出一堆结果。这有点像单个邮政信封和不断推出邮件的气动管道之间的区别。 - Malt
1
@srk 是的。它会阻塞直到计算完成。 - Malt
显示剩余10条评论

27

我从 RxJava 0.9 开始使用,现在已经用到了 1.3.2,不久会迁移到 2.x。我在一个私人项目中使用它,已经使用了8年。

我再也不能不用这个库来编程了。一开始我持怀疑态度,但你需要创造一种完全不同的思维方式,这在一开始是相当困难的。我有时候要看着那些弹珠图案几个小时...哈哈

只需练习和真正了解流程(即observables和observer之间的契约),一旦你达到那里,你就会厌恶以其他方式进行编程。

对我来说,这个库没有真正的缺点。

用例:

我有一个监视器视图,包含9个仪表(CPU、内存、网络等...)。启动视图时,视图将自己订阅到一个系统监视器类,该类返回一个observable(interval),其中包含9个计量器的所有数据。 每秒钟它将向视图推送一个新结果(因此不是轮询!)。 该observable使用flatmap同时(异步!)从9个不同的源获取数据,并将结果压缩成一个新模型,该模型将在onNext()方法中传递给视图。

你如何使用 futures、completables 等来完成这个操作...祝你好运! :)

RxJava 对我在编程方面解决了许多问题,并使得许多事情变得更加容易…

优势:

  • 无状态!!!(重要的一点,也可能是最重要的)
  • 开箱即用的线程管理
  • 构建具有自己生命周期的序列
  • 所有东西都是 observables,因此链接很容易
  • 编写的代码更少
  • 只需一个类路径下的单个 jar 文件(非常轻量级)
  • 高并发
  • 不再有回调地狱
  • 基于订阅者(消费者和生产者之间的紧密契约)
  • 反压策略(类似于断路器)
  • 出色的错误处理和恢复
  • 非常好的文档(弹珠图案 <3)
  • 完全控制
  • 更多的……

缺点: - 难以测试


20
“我再也不会编写没有这个库的程序了。” 那么 RxJava 对于所有软件项目来说都是最终解决方案吗? - IgorGanapolsky
2
即使我没有异步事件流,这个有用吗? - Mukesh Verma
...或者说事件流是来自操作系统(重绘、鼠标、键盘输入等)? - Thomas S.

7
Java的Future是一个占位符,用于保存将来使用阻塞API完成的任务。您需要使用它的isDone()方法定期轮询以检查该任务是否已完成。当然,您可以实现自己的异步代码来管理轮询逻辑。但是,这会产生更多的样板代码和调试开销。
Java的CompletableFuture是由Scala的Future创新的。它带有内部回调方法。一旦完成,回调方法将被触发并告诉线程应执行下游操作。这就是为什么它具有thenApply方法来对包装在CompletableFuture中的对象进行进一步操作的原因。
RxJava的Observable是CompletableFuture的增强版。它允许您处理背压(backpressure)。在我们上面提到的thenApply方法(甚至包括其兄弟thenApplyAsync)中,可能会出现这种情况:下游方法想要调用一个有时可能无法使用的外部服务。在这种情况下,CompleteableFuture将完全失败,您必须自己处理错误。然而,Observable允许您处理背压并在外部服务可用后继续执行。
此外,还有一个类似于Observable的接口:Flowable。它们被设计用于不同的目的。通常,Flowable专门用于处理冷和非定时操作,而Observable专门用于处理需要即时响应的执行。请参见官方文档:https://github.com/ReactiveX/RxJava#backpressure

5

这三个接口都用于将生产者的值传输给消费者。消费者可以分为两种类型:

  • 同步:消费者进行阻塞调用,当值准备好时返回
  • 异步:当值准备好时,调用消费者的回调方法

此外,通信接口在其他方面也有所不同:

  • 能够传输单个值或多个值
  • 如果有多个值,则可以支持背压或不支持

因此:

  • Future 使用同步接口传输单个值

  • CompletableFuture 使用同步和异步接口传输单个值

  • Rx 使用具有背压的异步接口传输多个值

此外,所有这些通信设施都支持传输异常。但这并非总是如此。例如,BlockingQueue 不支持异常传输。


3

CompletableFuture相较于普通Future的主要优势在于它利用了强大的Stream API并为您提供回调处理程序来链接任务,而如果使用普通Future则完全不存在这种优势。除此之外,CompletableFuture还提供异步架构,因此在处理计算密集型的map-reduce任务时,CompletableFuture是首选,而不必过多担心应用程序性能。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接