并行流、收集器和线程安全

54

看下面的简单示例,它统计了列表中每个单词出现的次数:

Stream<String> words = Stream.of("a", "b", "a", "c");
Map<String, Integer> wordsCount = words.collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

最终,wordsCount 的值为 {a=2, b=1, c=1}

但是我的数据流非常大,我希望能够并行处理这个任务,因此我写了以下代码:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

然而,我注意到wordsCount仅仅是一个简单的HashMap,因此我想知道是否需要显式地请求一个并发映射表以确保线程安全性:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toConcurrentMap(s -> s, s -> 1,
                                                                (i, j) -> i + j));

非并发收集器能否安全地与并行流一起使用,或者我只应该在从并行流收集时使用并发版本?

3个回答

54

在使用并行流的collect操作时,可以安全地使用非并发收集器,不一定需要使用并发版本。

在Collector接口的规范中的一个带有半打要点的部分,是这样说明的:

对于非并发收集器,在结果供应商、累加器或组合函数返回的任何结果必须串行线程限定。这使得收集可以在并行情况下发生,而无需Collector实现任何额外的同步。Reduction实现必须确保输入被正确分区、分区被隔离处理,并且仅在累积完成后才进行组合。

这意味着,即使某些实现可能不是并发收集器,由Collectors类提供的各种实现也可以与并行流一起使用。这也适用于您自己实现的任何非并发收集器。只要您的收集器不干涉流源,是无副作用的、无序的等,就可以安全地与并行流一起使用。

我建议阅读java.util.stream包文档中的Mutable Reduction部分。在这一部分的中间,有一个被说明可以并行化的示例,但它将结果收集到了一个非线程安全的ArrayList中。
这是因为以非并发收集器结束的并行流确保不同的线程始终在操作中间结果集的不同实例。这就是为什么收集器有一个Supplier函数,用于创建与线程数相同的中间集合,以便每个线程都可以累积到自己的集合中。当需要合并中间结果时,它们会在线程之间安全地交接,任何时候只有一个线程正在合并任何一对中间结果。

27

所有的收集器,如果按照规范执行,都可以安全地并行或顺序运行。并行可读性是设计的关键部分。

并发和非并发收集器之间的区别与并行化方法有关。

普通(非并发)收集器通过合并子结果来操作。因此,源被划分为一堆块,每个块都被收集到结果容器中(例如列表或映射),然后子结果被合并成一个更大的结果容器。这是安全的且保持顺序,但对于某些类型的容器 -- 特别是映射 -- 可能是昂贵的,因为按键合并两个映射通常是昂贵的。

相反,并发收集器创建一个结果容器,其插入操作的线程安全性得到保证,并从多个线程向其中插入元素。对于像ConcurrentHashMap这样高度并发的结果容器,这种方法可能比合并普通HashMap表现得更好。

因此,并发收集器严格是其普通同类的优化。它们也不是没有代价;由于元素来自多个线程,因此并发收集器通常无法保持遇到的顺序。(但是,在创建单词计数直方图时,您通常不在意首次计数的“foo”是哪个实例。)


顶部答案的最后一段似乎在描述你的第三段。你是说那是错误的,实际上应该像(4和5)那样做? - Noumenon
2
@Noumenon 3和4/5之间的区别在于groupingBygroupingByConcurrent之间的区别。前者保证顺序不变,并且可以并行安全,但可能会较慢。后者也是并行安全的,通常并行化更好,但牺牲了顺序保留。程序员必须选择他们的权衡。 - Brian Goetz
谢谢,我之前没有注意到并发和非并发收集器的区别。 - Noumenon

14

使用非并发集合和非原子计数器与并行流是安全的。

如果您查看Stream :: collect的文档,您会发现以下段落:

reduce(Object, BinaryOperator)一样,收集操作可以并行执行而无需额外同步。

对于方法Stream::reduce

虽然与在循环中变异运行总和相比,这可能似乎是更迂回的聚合执行方式,但缩减操作更加平滑地并行化,不需要额外的同步,并且极大地降低了数据竞争的风险。

这可能有点令人惊讶。但是请注意,并行流基于分支-合并模型。这意味着并发执行的工作方式如下:

  • 将序列拆分为大约相同大小的两个部分
  • 单独处理每个部分
  • 收集两个部分的结果并将它们组合成一个结果

在第二步中,子序列递归地应用这三个步骤。

一个例子可以说明这一点。

IntStream.range(0, 4)
    .parallel()
    .collect(Trace::new, Trace::accumulate, Trace::combine);

Trace类唯一的目的是记录构造函数和方法调用。如果您执行此语句,它将打印以下行:

只有目的是记录构造函数和方法调用的Trace类。如果执行这个语句,会输出以下几行:

thread:  9  /  operation: new
thread: 10  /  operation: new
thread: 10  /  operation: accumulate
thread:  1  /  operation: new
thread:  1  /  operation: accumulate
thread:  1  /  operation: combine
thread: 11  /  operation: new
thread: 11  /  operation: accumulate
thread:  9  /  operation: accumulate
thread:  9  /  operation: combine
thread:  9  /  operation: combine

你可以看到,已经创建了四个Trace对象,每个对象都调用了一次accumulate方法,并且使用了三次combine将这四个对象合并成一个。每个对象一次只能被一个线程访问。这使得代码是线程安全的,同样适用于Collectors::toMap方法。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接