RxJava - 调度器与ExecutorService有何区别?

18

我有一个预感,在RxJava中,对于高度计算和并行化的任务,传统的ExecutorServiceScheduler更快。

我有一个理论,认为这段代码

Observable<MyItem> source = ...

source.flatMap(myItem -> myItem.process().subscribeOn(Schedulers.computation()))
.subscribe();

会比这个运行得慢

final ExecutorService svc = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
Observable<MyItem> source = ...

source.flatMap(myItem -> myItem.process().subscribeOn(Schedulers.from(svc)))
.finallyDo(svc::shutdown)
.subscribe();

我将这两种方法与我在工作中通常执行的并行流程进行了比较,得到了以下结果。

EXECUTOR

START: 2016-01-25T09:47:04.350
END: 2016-01-25T09:48:37.181
TOTAL TIME (SEC): 92


COMPUTATION SCHEDULER

START: 2016-01-25T09:50:37.799
END: 2016-01-25T09:54:23.674
TOTAL TIME (SEC): 225

我的初步测试表明,传统的ExecutorServiceScheduler在计算方面要快得多。

这些结果有原因吗?是RxJava调度程序没有针对并行处理进行优化吗?我已经得出的印象是,计算调度程序使用的线程比Executors少。


1
在这两种情况下,它都没有并行处理项目。如果您想进行并行执行,应该检查实验性库RxJavaParallel - Dan Lew
дёәд»Җд№ҲExecutorServiceжҜ”Schedulers.computation()жӣҙеҝ«пјҢиҝҷд»Қ然жҳҜдёҖдёӘеҘҪй—®йўҳгҖӮжҲ‘жІЎжңүиө„ж јеӣһзӯ”иҝҷдёӘй—®йўҳгҖӮ - Dan Lew
3
David Karnok在我的RxJava并行化文章上发表评论,他说:“如果你有8个核心,这只会使用其中的5个。如果你有4个核心,两组将共享同一个核心。” 我很好奇为什么RxJava调度器在创建线程和利用处理器方面如此保守。 - tmn
3
实际上,它会并行处理项目,因为subscribeOn() 出现在 flatMap() 中。 - tmn
1
我猜是因为 Schedulers.computation 为每个工作线程使用 Executors.newScheduledThreadPool(1, threadFactory) - zsxwing
显示剩余4条评论
2个回答

15

3
当您使用Schedulers.computation()时,所有事件都在同一个线程中处理。您可以参考源代码CachedThreadScheduler.javaNewThreadWorker.java。此实现的好处是,如果事件A在事件B之后发出,则事件A将在事件B之后处理。
当您使用Schedulers.from()时,事件将在不同的线程中处理。

刚刚检查了computation()调度程序 - 仍然有并行执行。RxJava2的文档表示,与CPU数量相同的单线程实例有很多。 - pratclot

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接