在可能的情况下,我是否应该始终使用并行流?

650

使用Java 8和lambda表达式,迭代集合并将其作为流进行操作变得十分容易,而且同样轻松的还能使用并行流。以下是来自文档中的两个示例,第二个示例使用了parallelStream:

myShapesCollection.stream()
    .filter(e -> e.getColor() == Color.RED)
    .forEach(e -> System.out.println(e.getName()));

myShapesCollection.parallelStream() // <-- This one uses parallel
    .filter(e -> e.getColor() == Color.RED)
    .forEach(e -> System.out.println(e.getName()));
只要不关心顺序,使用并行流总是有益的吗?人们可能认为将工作分配到更多核心上会更快。

还有其他的考虑因素吗?什么时候应该使用并行流,什么时候应该使用非并行流?(这个问题是用来引发讨论如何以及何时使用并行流,而不是因为我认为总是使用它们是一个好主意。)

6个回答

884

并行流与顺序流相比,开销要高得多。协调线程需要大量时间。我会默认使用顺序流,只有在以下情况下才考虑并行流:

  • 我有大量需要处理的项目(或者每个项目的处理需要时间且可以并行化)

  • 我本来就有性能问题

  • 我尚未在多线程环境中运行该进程(例如:在Web容器中,如果我已经有许多请求并行处理,那么在每个请求内部添加额外的并行层可能会产生更多负面效果而非积极效果)

在您的例子中,性能将受到对 System.out.println() 的同步访问所驱动,将此过程并行化将没有效果,甚至可能产生负面效果。

此外,请记住,并行流不能神奇地解决所有同步问题。如果谓词和函数在过程中使用共享资源,则必须确保所有内容都是线程安全的。特别是,如果您进行并行处理,副作用是您真正需要担心的事情。

无论如何,要进行测量,不要猜测!只有测量才能告诉您并行性是否值得。


29
好的回答。我想补充一点,如果你有大量的项目需要处理,这只会增加线程协调问题;只有当每个项目的处理需要时间且可并行化时,才有可能使用并行化。 - Warren Dew
23
@WarrenDew 我不同意。Fork/Join系统只是将N个项目分成4个部分,然后按顺序处理这4个部分。然后将4个结果进行合并。如果任务很大,即使对于快速单元处理,并行化也可能是有效的。但是像往常一样,你必须进行测量。 - JB Nizet
1
@JBNizet 如果4个部分按顺序处理,那么它是并行处理还是顺序处理没有区别,对吗?请澄清一下。 - Harshana
5
他的意思显然是每个4部分中的元素将被依次处理,但是这些部分本身可以同时进行处理。换句话说,如果您有多个CPU核心可用,每个部分可以在独立于其他部分的情况下运行在自己的核心上,并按顺序处理其自身的元素。(注意:我不知道并行Java流是如何工作的,我只是想澄清JBNizet所说的内容。) - tomorrow
@JBNizet 的回答非常完美!这是在代码中添加并行流之前要访问的检查清单。 - Gaurav
显示剩余5条评论

299

Stream API旨在让编写计算变得容易,这种方式与执行方式无关,可以轻松地在顺序和并行之间切换。

但是,仅仅因为它很容易,并不意味着随便在代码中添加.parallel()总是一个好主意,事实上,这是个不好的想法。

首先,需要注意的是,并行处理除了当有更多的核心可用时能够提供更快的执行速度外,没有其他好处。 并行执行始终涉及更多的工作,因为除了解决问题外,还必须执行子任务的分派和协调工作。 希望通过在多个处理器上分解工作来更快地得到答案;是否会真正发生这种情况取决于很多因素,包括您的数据集的大小,每个元素上执行的计算量,计算的性质(特别是一个元素的处理是否与其他元素的处理相互作用),可用处理器数量以及竞争这些处理器的其他任务数量。

此外,需要注意的是,并行处理通常也会暴露计算中的非确定性,这在顺序执行中通常是隐藏的;有时这并不重要,或者可以通过限制涉及的操作来缓解(即,约减运算符必须是无状态且可结合的)。

实际上,有时并行处理将加速您的计算,有时不会,甚至有时会减慢它。最好首先使用顺序执行进行开发,然后仅在以下情况应用并行处理:

(A) 您知道增加性能确实有好处;

(B) 它确实会提供增加的性能。

(A) 是一个商业问题,而不是技术问题。如果您是性能专家,通常可以查看代码并确定(B),但聪明的做法是进行测量。(如果代码已经足够快,则最好将你的大脑周期应用于其他领域,甚至不要考虑并行处理。)

最简单的并行性性能模型是“NQ”模型,其中N是元素数量,Q是每个元素的计算。一般来说,您需要将NQ乘积超过某个阈值才能开始获得性能优势。对于像“将数字从 1 加到 N ”这样的低Q问题,通常在N = 1000 N = 10000 之间会看到成本收支平衡点。对于高Q问题,您会在更低的阈值处看到成本收支平衡点。

但现实相当复杂。因此,在达到专业水平之前,首先要确定顺序处理实际上是否会给您带来成本,然后再测量并行性是否有所帮助。


22
本文介绍了NQ模型的更多细节内容:http://gee.cs.oswego.edu/dl/html/StreamParallelGuidance.html - Pino
4
@specializt说:将流从顺序切换为并行在大多数情况下确实会改变算法。此处提到的确定性是指您(任意)运算符可能依赖的属性(Stream实现无法知道),但当然不应该依赖。这就是本回答中该部分试图表达的内容。如果您关心规则,您可以获得确定性结果,就像您所说的那样(否则并行流将非常无用),但也有故意允许的不确定性可能性,例如使用findAny而不是findFirst时... - Holger
6
首先,需要注意的是并行性除了在有更多核心可用时可能提供更快的执行速度外,没有其他任何好处。如果您正在执行涉及IO操作的操作(例如myListOfURLs.stream().map((url) -> downloadPage(url))...),也是如此。 - Jules
7
@Pacerier,这是一个不错的理论,但很遗憾太天真了(可以看一下三十年来尝试构建自动并行编译器的历史)。由于我们很难猜测正确足够多的时间,以免在我们必然出错时使用户感到烦恼,负责任的做法是让用户说出他们想要什么。对于大多数情况,顺序执行是正确的选择,并且更加可预测。 - Brian Goetz
3
@Jules:永远不要使用并行流进行IO操作。它们仅适用于CPU密集型操作。并行流使用ForkJoinPool.commonPool(),您不希望阻塞任务进入该池。 - R2C2
显示剩余6条评论

86

我观看了Brian Goetz演示之一,他详细解释了在进行并行化之前需要考虑的以下四个要点:

分裂/分解成本
- 有时分裂比只做工作还要昂贵!
任务分派/管理成本
- 在将工作交给另一个线程的时间内可以完成大量工作。
结果组合成本
- 有时组合涉及复制大量数据。例如,添加数字很便宜,而合并集合很昂贵。
局部性
- 这是一个重要的问题,每个人都可能会忽略。你应该考虑缓存未命中,如果CPU由于缓存未命中而等待数据,那么并行化就没有任何好处。这就是为什么基于数组的源代码并行化得最好,因为接下来的索引(靠近当前索引)被缓存,CPU体验缓存未命中的机会更少。

他还提到了一个相对简单的公式来确定并行加速的机会。

NQ模型

N x Q > 10000

其中,
N表示数据项的数量
Q表示每个数据项需要处理的工作量


20
“每件物品的工作量”以什么单位衡量?10000代表什么? - Nikolas Charalambidis

5

永远不要在具有限制的无限流上并行化处理。以下是会发生的事情:

    public static void main(String[] args) {
        // let's count to 1 in parallel
        System.out.println(
            IntStream.iterate(0, i -> i + 1)
                .parallel()
                .skip(1)
                .findFirst()
                .getAsInt());
    }

结果

    Exception in thread "main" java.lang.OutOfMemoryError
        at ...
        at java.base/java.util.stream.IntPipeline.findFirst(IntPipeline.java:528)
        at InfiniteTest.main(InfiniteTest.java:24)
    Caused by: java.lang.OutOfMemoryError: Java heap space
        at java.base/java.util.stream.SpinedBuffer$OfInt.newArray(SpinedBuffer.java:750)
        at ...

如果您使用.limit(...),同样的道理也适用。

这里有个解释: Java 8中,在流中使用.parallel会导致OOM错误

同样地,如果流是有序的并且要处理的元素比您想要的多得多,请不要使用.parallel,例如:

public static void main(String[] args) {
    // let's count to 1 in parallel
    System.out.println(
            IntStream.range(1, 1000_000_000)
                    .parallel()
                    .skip(100)
                    .findFirst()
                    .getAsInt());
}

由于并行线程可能在大量数字范围上工作而不是关键的0-100范围,因此这可能需要更长时间才能运行。


4
其他回答已经涵盖了优化前期和并行处理中的开销成本。这个回答说明了用于并行流的理想数据结构的选择。
通常情况下,通过并行处理获得的性能提升最好的数据结构是 ArrayListHashMapHashSetConcurrentHashMap 实例; 数组;int 范围;以及 long 范围。这些数据结构的共同之处是它们都可以准确且廉价地分成任意所需大小的子范围,这使得将工作划分给并行线程变得容易。流库用于执行此任务的抽象是 spliterator,它由 StreamIterable 上的 spliterator 方法返回。
所有这些数据结构共同具有的另一个重要因素是,在顺序处理时,它们都提供良好至极佳的引用局部性:顺序元素引用在内存中一起存储。这些引用所指向的对象可能不会在内存中彼此靠近,这降低了引用局部性。引用局部性结果证明对于批量操作的并行化至关重要:如果没有它,线程会花费大量时间处于空闲状态,等待数据从内存传输到处理器的高速缓存中。具有最佳引用局部性的数据结构是原始数组,因为数据本身在内存中是连续存储的。
来源:《Effective Java 3e》Joshua Bloch 的第 48 条“谨慎使用流的并行模式”。

3
Collection.parallelStream()是一种很好的并行处理方式。但需要注意,它实际上使用一个共同的线程池,内部只有少量工作线程(默认情况下,线程数等于CPU核心数),请参阅ForkJoinPool.commonPool()。如果池中的某些任务是长时间的I/O绑定工作,则其他可能快速的parallelStream调用将被卡住等待空闲池线程。这显然需要fork-join任务是非阻塞和短或者说是cpu-bound。为了更好地理解细节,强烈建议仔细阅读java.util.concurrent.ForkJoinTask javadoc,以下是一些相关引用:

ForkJoinTasks的效率源于...它们主要用作计算纯函数或操作纯隔离对象的计算任务。

计算应尽可能避免同步方法或块,并应最小化其他阻塞同步

可分割任务也不应执行阻塞I/O

这些指示了parallelStream()任务的主要目的是短暂计算内存结构。还建议查看文章Common parallel stream pitfalls


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接