Java并行流的性能影响

5

使用.stream().parallel()的最佳实践是什么?

例如,如果您有一堆阻塞I/O调用,并且想要检查.anyMatch(...),并行执行似乎是一个明智的选择。

示例代码:

public boolean hasAnyRecentReference(JobId jobid) {
  <...>
  return pendingJobReferences.stream()
     .parallel()
     .anyMatch(pendingRef -> { 
       JobReference readReference = pendingRef.sync();
       Duration referenceAge = timeService.timeSince(readReference.creationTime());
       return referenceAge.lessThan(maxReferenceAge)
     });
}

乍一看,这似乎是个明智的做法,因为我们可以并发执行多个阻塞读取,因为我们只关心任何一项匹配而不是逐个检查(所以如果每个读取需要50毫秒,我们只需等待(50毫秒 * 期望的非最近引用数) / 线程数)。

在生产环境中引入此代码是否会对代码库中的其他部分产生未预料的性能影响?

1个回答

5

编辑:正如@edharned所指出的,.parallel()现在使用CountedCompleter而不是调用.join(),这本身也存在问题,正如Ed在http://coopsoft.com/ar/Calamity2Article.html的“当前正在做什么?”一节中所解释的那样。

我认为下面的信息仍然有用,可以帮助了解为什么分叉-加入框架很棘手,以及在结论中提出的替代方法仍然相关。


虽然代码的精神是正确的,但实际代码可能会对所有使用.parallel()的代码产生系统范围的影响,即使这并不明显。

不久前,我发现了一篇文章,建议不要这样做:https://dzone.com/articles/think-twice-using-java-8,但我直到最近才深入挖掘。

以下是我阅读后的想法:

  1. .parallel()在Java中使用ForkJoinPool.commonPool(),这是一个单例的ForkJoinPool,由所有流共享(ForkJoinPool.commonPool()是一个公共静态方法,因此理论上其他库/代码部分也可能使用它)
  2. ForkJoinPool实现了工作窃取并拥有除共享队列外的每个线程队列

    1. 工作窃取意味着当线程空闲时,它会寻找更多的工作
    2. 起初我认为:按照这个定义,cached线程池是否也会做工作窃取(即使一些参考资料称其为缓存线程池的工作共享)?
    3. 事实证明,在使用空闲单词时似乎存在一些术语模糊:

      1. cached线程池中,仅在线程完成其任务后才会空闲。如果它被阻止等待阻塞调用,则不会变为空闲状态
      2. forkjoin线程池中,当线程完成其任务或在子任务上调用.join()方法(这是一个特殊的阻塞调用)时,线程处于空闲状态。

        当在子任务上调用.join()时,线程在等待该子任务完成时变为空闲状态。在空闲状态下,它将尝试执行任何其他可用任务,即使它在另一个线程的队列中(它窃取工作)。

        [这是重要的]一旦找到另一个要执行的任务,在恢复其原始执行之前,必须完成它,即使它正在执行被窃取任务时子任务已经完成。

        [这也很重要]此工作窃取行为仅适用于调用.join()的线程。如果线程被阻塞在其他地方,如I/O,它就会空闲(即它不会窃取任务)。

    4. Java流不允许您提供自定义ForkJoinPool,但https://github.com/amaembo/streamex可以

    我花了一些时间才理解2.3.2的影响,因此我将举一个快速示例以帮助说明问题:

    注:这些是虚拟示例,但您可以使用流进入等效情况,而不会意识到它,因为内部执行分叉和加入操作。

    此外,我将使用极其简化的伪代码,仅用于说明.parallel()问题,但在其他情况下可能并不合理。

    假设我们正在实现归并排序

merge_sort(list):
    left, right = split(list)

    leftTask = mergeSortTask(left).fork()
    rightTask = mergeSortTaks(right).fork()

    return merge(leftTask.join(), rightTask.join())

现在假设我们有另一段代码,它执行以下操作:

dummy_collect_results(queriesIds):
   pending_results = []

   for id in queriesIds: 
     pending_results += longBlockingIOTask(id).fork()

  // do more stuff

这里发生了什么?

当你编写归并排序代码时,你认为排序调用不执行任何 I/O 操作,因此它们的性能应该是非常确定的,对吗?

没错。但你可能没有预料到的是,由于 dummy_collect_results 方法创建了一堆长时间运行和阻塞的子任务,当线程在 .join() 上阻塞等待子任务完成时,它们可能会开始执行其中一个长时间阻塞的子任务。

这很糟糕,因为如上所述,一旦长时间阻塞(在 I/O 上而不是 .join() 调用上),被窃取的线程必须完成它,无论通过 .join() 等待的子任务是否在阻塞 I/O 期间完成。

这使得归并排序任务的执行不再确定,因为执行这些任务的线程可能最终窃取在完全不同的代码中生成的 I/O 密集型任务。

这也非常可怕且难以捕捉,因为你可以在整个代码库中使用 .parallel() 而没有任何问题,只需要一个类在使用 .parallel() 时引入了长时间运行的任务,所有其他部分的代码基础可能会出现不一致的性能问题。

因此,我的结论是:

  1. 理论上,如果你可以保证代码中任何地方创建的所有任务都很短,则 .parallel() 是可以接受的。
  2. .parallel() 可能对整个系统的性能产生非明显的影响(例如,如果以后添加了一个使用 .parallel() 并具有长任务的代码片段,则可能会影响使用 .parallel() 的所有代码的性能)。
  3. 由于第 2 点,最好完全避免使用 .parallel(),而是使用 ExecutorCompletionService 或使用 https://github.com/amaembo/streamex,它允许你提供自己的 ForkJoinPool(这允许更多的隔离)。更好的是,你可以使用 https://github.com/palantir/streams/blob/1.9.1/src/main/java/com/palantir/common/streams/MoreStreams.java#L53,它给你更精细的控制并发机制的方式。

1
parallel() 使用 CountedCompleter 类,而不是 join()。正如您所指出并且我最初写的那样,join() 的效果并不如广告所述。 - edharned

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接