编辑:正如@edharned所指出的,.parallel()
现在使用CountedCompleter
而不是调用.join()
,这本身也存在问题,正如Ed在http://coopsoft.com/ar/Calamity2Article.html的“当前正在做什么?”一节中所解释的那样。
我认为下面的信息仍然有用,可以帮助了解为什么分叉-加入框架很棘手,以及在结论中提出的替代方法仍然相关。
虽然代码的精神是正确的,但实际代码可能会对所有使用.parallel()
的代码产生系统范围的影响,即使这并不明显。
不久前,我发现了一篇文章,建议不要这样做:https://dzone.com/articles/think-twice-using-java-8,但我直到最近才深入挖掘。
以下是我阅读后的想法:
.parallel()
在Java中使用ForkJoinPool.commonPool()
,这是一个单例的ForkJoinPool
,由所有流共享(ForkJoinPool.commonPool()
是一个公共静态方法,因此理论上其他库/代码部分也可能使用它)
ForkJoinPool
实现了工作窃取并拥有除共享队列外的每个线程队列
- 工作窃取意味着当线程空闲时,它会寻找更多的工作
- 起初我认为:按照这个定义,
cached
线程池是否也会做工作窃取(即使一些参考资料称其为缓存线程池的工作共享)?
事实证明,在使用空闲单词时似乎存在一些术语模糊:
- 在
cached
线程池中,仅在线程完成其任务后才会空闲。如果它被阻止等待阻塞调用,则不会变为空闲状态
在forkjoin
线程池中,当线程完成其任务或在子任务上调用.join()
方法(这是一个特殊的阻塞调用)时,线程处于空闲状态。
当在子任务上调用.join()
时,线程在等待该子任务完成时变为空闲状态。在空闲状态下,它将尝试执行任何其他可用任务,即使它在另一个线程的队列中(它窃取工作)。
[这是重要的]一旦找到另一个要执行的任务,在恢复其原始执行之前,必须完成它,即使它正在执行被窃取任务时子任务已经完成。
[这也很重要]此工作窃取行为仅适用于调用.join()
的线程。如果线程被阻塞在其他地方,如I/O,它就会空闲(即它不会窃取任务)。
Java流不允许您提供自定义ForkJoinPool,但https://github.com/amaembo/streamex可以
我花了一些时间才理解2.3.2
的影响,因此我将举一个快速示例以帮助说明问题:
注:这些是虚拟示例,但您可以使用流进入等效情况,而不会意识到它,因为内部执行分叉和加入操作。
此外,我将使用极其简化的伪代码,仅用于说明.parallel()问题,但在其他情况下可能并不合理。
假设我们正在实现归并排序
merge_sort(list):
left, right = split(list)
leftTask = mergeSortTask(left).fork()
rightTask = mergeSortTaks(right).fork()
return merge(leftTask.join(), rightTask.join())
现在假设我们有另一段代码,它执行以下操作:
dummy_collect_results(queriesIds):
pending_results = []
for id in queriesIds:
pending_results += longBlockingIOTask(id).fork()
// do more stuff
这里发生了什么?
当你编写归并排序代码时,你认为排序调用不执行任何 I/O 操作,因此它们的性能应该是非常确定的,对吗?
没错。但你可能没有预料到的是,由于 dummy_collect_results
方法创建了一堆长时间运行和阻塞的子任务,当线程在 .join()
上阻塞等待子任务完成时,它们可能会开始执行其中一个长时间阻塞的子任务。
这很糟糕,因为如上所述,一旦长时间阻塞(在 I/O 上而不是 .join()
调用上),被窃取的线程必须完成它,无论通过 .join()
等待的子任务是否在阻塞 I/O 期间完成。
这使得归并排序任务的执行不再确定,因为执行这些任务的线程可能最终窃取在完全不同的代码中生成的 I/O 密集型任务。
这也非常可怕且难以捕捉,因为你可以在整个代码库中使用 .parallel()
而没有任何问题,只需要一个类在使用 .parallel()
时引入了长时间运行的任务,所有其他部分的代码基础可能会出现不一致的性能问题。
因此,我的结论是:
- 理论上,如果你可以保证代码中任何地方创建的所有任务都很短,则
.parallel()
是可以接受的。
.parallel()
可能对整个系统的性能产生非明显的影响(例如,如果以后添加了一个使用 .parallel()
并具有长任务的代码片段,则可能会影响使用 .parallel()
的所有代码的性能)。
- 由于第 2 点,最好完全避免使用
.parallel()
,而是使用 ExecutorCompletionService
或使用 https://github.com/amaembo/streamex,它允许你提供自己的 ForkJoinPool
(这允许更多的隔离)。更好的是,你可以使用 https://github.com/palantir/streams/blob/1.9.1/src/main/java/com/palantir/common/streams/MoreStreams.java#L53,它给你更精细的控制并发机制的方式。