RxJava调度器的使用场景

257
在RxJava中,有5种不同的调度器可供选择:
  1. immediate():创建并返回一个调度器,该调度器立即在当前线程上执行工作。

  2. trampoline():创建并返回一个调度器,该调度器将工作排队在当前线程上,在当前工作完成后执行。

  3. newThread():创建并返回一个调度器,每个工作单元都会创建一个新线程。

  4. computation():创建并返回一个用于计算工作的调度器。这可用于事件循环、处理回调和其他计算工作。请勿在此调度器上执行IO绑定工作。请改用Schedulers.io()

  5. io():创建并返回一个用于IO绑定工作的调度器。 实现由一个Executor线程池支持,该线程池将根据需要增长。这可用于异步执行阻塞IO。请勿在此调度器上执行计算工作。请改用Schedulers.computation()

问题:

前三个调度器相当容易理解,但是我对计算io有些困惑。
1. “IO-bound work”究竟是什么?它用于处理流(java.io)和文件(java.nio.files)吗?它用于数据库查询吗?它用于下载文件或访问REST API吗?
2. computation()newThread()有何不同?所有的computation()调用都在单个(后台)线程上执行,而不是每次都在新的(后台)线程上执行吗?
3. 在进行IO工作时为什么不能调用computation()
4. 在进行计算工作时为什么不能调用io()
3个回答

341

很好的问题,我认为文档需要更详细的说明。

  1. io()由无界线程池支持,适用于非计算密集型任务,即不会对CPU产生大量负载的任务。所以与文件系统交互、与不同主机上的数据库或服务进行交互都是很好的例子。
  2. computation()由大小等于可用处理器数量的有界线程池支持。如果您试图在多个可用处理器之间并行调度CPU密集型工作(例如使用newThread()),那么您将面临线程创建开销和上下文切换开销,因为线程争夺处理器,这可能会带来巨大的性能损失。
  3. 最好只将computation()用于CPU密集型工作,否则您将无法获得良好的CPU利用率。
  4. 基于讨论中提到的原因,调用io()进行计算工作是不好的。io()是无界的,如果您并行调度了1000个计算任务到io(),则每个任务都将拥有自己的线程,并竞争CPU,产生上下文切换费用。

7
通过对 RxJava 源代码的熟悉,我长期以来一直感到困惑,并且认为文档在这方面需要加强。 - Dave Moten
4
你能否举个trampoline()实用的例子?我理解这个概念,但是无法想象它在实践中的应用场景。这是唯一一个对我仍然神秘的调度器。 - tmn
2
如果被调度的工作还会安排更多的工作,那么使用immediate()最终会导致StackOverflowError。而trampoline()通过排队避免了这种情况。 - Dave Moten
32
进行网络调用时,请使用Schedulers.io(),如果需要限制同时进行的网络调用数量,请使用Scheduler.from(Executors.newFixedThreadPool(n))。 - Dave Moten
4
你可能认为默认在 computation() 函数上设置 timeout 会阻塞线程,但事实并非如此。在内部,computation() 使用了一个 ScheduledExecutorService,因此延迟的操作不会阻塞线程。鉴于这个事实,使用 computation() 是一个好主意,因为如果它在另一个线程上执行,我们将面临线程切换成本。 - Dave Moten
显示剩余8条评论

5
最重要的一点是,无论是 Schedulers.io 还是 Schedulers.computation,都由无界线程池支持,而不是问题中提到的其他调度器。这种特性只有在使用 newCachedThreadPool (具有自动回收线程池的无界) 创建的 Executor 的情况下,Schedulers.from(Executor) 才会共享。
正如之前的回答和网络上的多篇文章所充分解释的那样,Schedulers.ioSchedulers.computation 应谨慎使用,因为它们针对其名称的工作类型进行了优化。但就我看来,它们最重要的作用是为反应式流提供实际并发性。
与初学者的想法相反,反应式流本质上不是并发的,而是异步且顺序执行的。基于这个原因,只有当 I/O 操作是阻塞的时候(例如:使用一个阻塞命令,如 Apache IOUtils 的 FileUtils.readFileAsString(...)),才应该使用 Schedulers.io,否则将会使调用线程被冻结直到操作完成。
如果使用诸如 Java AsynchronousFileChannel(...) 等异步方法,则在操作期间不会阻塞调用线程,因此没有必要使用单独的线程。实际上,Schedulers.io 线程不适用于异步操作,因为它们并没有运行事件循环,回调也将永远不会被调用。
同样的逻辑也适用于数据库访问或远程 API 调用。如果可以使用异步或反应式 API 进行调用,则不要使用 Schedulers.io
回到并发性问题。您可能无法访问异步或反应式 API 以异步或并发地执行 I/O 操作,因此您的唯一选择是在单独的线程上分派多个调用。但是,好消息是,虽然反应式流在其末端是顺序执行的,但是flatMap() 运算符可以在其核心引入并发性
必须在流构造中内置并发性,通常使用 flatMap() 运算符。该强大的运算符可以配置为在嵌入式 Function<T, R> 中为您提供一个多线程上下文。该上下文由多线程调度器(如 Scheduler.ioScheduler.computation)提供。
请参阅关于 RxJava2 的文章,了解有关 SchedulersConcurrency 的更多详细信息,在那里您将找到有关如何按顺序和并发地使用 Schedulers 的代码示例和详细说明。
希望这可以帮到您,
Softjake

3

这篇博客提供了一个很好的答案

来自博客文章:

Schedulers.io()由无界线程池支持。它用于非CPU密集型I/O类型工作,包括与文件系统交互、执行网络调用、数据库交互等。该线程池旨在用于异步执行阻塞IO。

Schedulers.computation()由有界线程池支持,大小为可用处理器数量。它用于计算或CPU密集型工作,例如调整图像大小、处理大型数据集等。请注意:当您分配比可用内核更多的计算线程时,性能会因上下文切换和线程创建开销而降低,因为线程争夺处理器时间。

Schedulers.newThread()为每个计划的工作单元创建一个新线程。此调度程序是昂贵的,因为每次都会产生新线程,不会发生重用。

Schedulers.from(Executor executor) 创建并返回一个由指定执行器支持的自定义调度程序。为了限制线程池中同时存在的线程数量,可以使用Scheduler.from(Executors.newFixedThreadPool(n))。这保证了如果任务在所有线程都被占用时被调度,它将被排队。线程池中的线程将一直存在,直到显式关闭。

Main线程或AndroidSchedulers.mainThread()是由RxAndroid扩展库提供给RxJava的。Main线程(也称为UI线程)是用户交互发生的地方。应该注意不要过载此线程,以防止出现卡顿、无响应的UI或更糟糕的“应用程序未响应”(ANR)对话框。

Schedulers.single() 是RxJava 2中新增的。此调度程序由单个线程支持,按请求顺序顺序执行任务。

Schedulers.trampoline()通过其中一个参与工作线程以FIFO(先进先出)方式执行任务。通常在实现递归时使用,以避免增加调用堆栈。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接