我非常想知道RxJava
是如何解决这个问题的。但我发现文档中的内容很难理解。
我非常想知道RxJava
是如何解决这个问题的。但我发现文档中的内容很难理解。
期货
Future是在Java 5(2004年)中引入的。它们基本上是尚未完成操作结果的占位符。一旦操作完成,Future
将包含该结果。例如,操作可以是Runnable或Callable实例,这些实例被提交给ExecutorService。操作的提交者可以使用Future
对象来检查操作是否isDone(), 或使用阻塞的get()方法等待操作完成。
例子:
/**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Thread.sleep(1000);
return 1;
}
}
public static void main(String[] args) throws Exception{
ExecutorService exec = Executors.newSingleThreadExecutor();
Future<Integer> f = exec.submit(new MyCallable());
System.out.println(f.isDone()); //False
System.out.println(f.get()); //Waits until the task is done, then prints 1
}
CompletableFutures
CompletableFutures 是在 Java 8(2014 年)中引入的。它们实际上是常规 Futures 的演变,受到 Google 的 Listenable Futures 和 Guava 库的启发。它们是 Futures,还允许您将任务串联在一起。您可以使用它们告诉某个工作线程“去执行某个任务 X,当完成后,使用 X 的结果执行另一个任务”。使用 CompletableFutures,您可以在不阻塞线程等待结果的情况下对操作结果进行处理。下面是一个简单的示例:
/**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {
@Override
public Integer get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
//Do nothing
}
return 1;
}
}
/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {
@Override
public Integer apply(Integer x) {
return x + 1;
}
}
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newSingleThreadExecutor();
CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
System.out.println(f.isDone()); // False
CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}
RxJava
RxJava是Netflix创建的整个用于反应式编程的库。一眼看上去,它似乎与Java 8的流类似。实际上,它比后者更加强大。
与Futures类似,RxJava可以用于串联一系列同步或异步操作以创建处理管道。与只能单次使用的Futures不同,RxJava适用于零个或多个项的流,包括具有无限数量项的永久流。由于拥有一个难以置信丰富的运算符集合,因此它也更加灵活和强大。
与Java 8的流不同,RxJava还具有背压机制,可以处理处理管道中不同部分在不同线程上以不同速率运行的情况。奖励:Java 9 响应式流
Java 9的响应式流,又称Flow API,是由各种响应式流库(如RxJava 2、Akka Streams和Vertx)实现的一组接口。它们允许这些响应式库相互连接,同时保留了重要的反压功能。
我从 RxJava 0.9 开始使用,现在已经用到了 1.3.2,不久会迁移到 2.x。我在一个私人项目中使用它,已经使用了8年。
我再也不能不用这个库来编程了。一开始我持怀疑态度,但你需要创造一种完全不同的思维方式,这在一开始是相当困难的。我有时候要看着那些弹珠图案几个小时...哈哈
只需练习和真正了解流程(即observables和observer之间的契约),一旦你达到那里,你就会厌恶以其他方式进行编程。
对我来说,这个库没有真正的缺点。
用例:
我有一个监视器视图,包含9个仪表(CPU、内存、网络等...)。启动视图时,视图将自己订阅到一个系统监视器类,该类返回一个observable(interval),其中包含9个计量器的所有数据。 每秒钟它将向视图推送一个新结果(因此不是轮询!)。 该observable使用flatmap同时(异步!)从9个不同的源获取数据,并将结果压缩成一个新模型,该模型将在onNext()方法中传递给视图。
你如何使用 futures、completables 等来完成这个操作...祝你好运! :)
RxJava 对我在编程方面解决了许多问题,并使得许多事情变得更加容易…
优势:
缺点: - 难以测试
这三个接口都用于将生产者的值传输给消费者。消费者可以分为两种类型:
此外,通信接口在其他方面也有所不同:
因此:
Future
使用同步接口传输单个值
CompletableFuture
使用同步和异步接口传输单个值
Rx
使用具有背压的异步接口传输多个值
此外,所有这些通信设施都支持传输异常。但这并非总是如此。例如,BlockingQueue
不支持异常传输。
CompletableFuture相较于普通Future的主要优势在于它利用了强大的Stream API并为您提供回调处理程序来链接任务,而如果使用普通Future则完全不存在这种优势。除此之外,CompletableFuture还提供异步架构,因此在处理计算密集型的map-reduce任务时,CompletableFuture是首选,而不必过多担心应用程序性能。