在Java 8中并行流的收集方式

28
我希望您能将输入转化为并行流,然后将输出作为列表。输入可以是任何可应用流的列表或集合。
我的顾虑是,如果我们想要将输出作为映射,则有一种来自Java的选项。
list.parallelStream().collect(Collectors.toConcurrentMap(args))

但我没有看到任何选项可以以线程安全的方式从并行流中收集数据以提供列表输出。我发现还有一个选项可用:

list.parallelStream().collect(Collectors.toCollection(<Concurrent Implementation>))

通过这种方式,我们可以在 collect 方法中提供各种并发实现。但是,我认为 java.util.concurrent 中只有 CopyOnWriteArrayList 列表实现。我们可以在此处使用各种队列实现,但这些实现将不会像列表那样。我的意思是,我们可以通过解决方法来获取列表。

如果我希望输出为列表,最佳方法是什么?请指导一下我。

注:我找不到与此相关的其他帖子,任何参考资料都将有所帮助。


1
你不能只是传递一个 Collections.synchronizedList(new ArrayList<>()) 吗? - Ole V.V.
5
不需要,集合对象不需要线程安全。 - Andreas
2个回答

48
收集数据的Collection对象不需要是并发的。您可以给它一个简单的ArrayList
这是因为从并行流中收集的值的集合实际上并没有被收集到单个的Collection对象中。每个线程将收集自己的数据,然后所有子结果将被合并成一个最终的Collection对象。
这在Collector javadoc中都有详细说明,并且Collector是您提供给collect()方法的参数。
<R,A> R collect(Collector<? super T,A,R> collector)

我想我错过了那部分内容。我的最初理解是我们传递的集合只会收集单个元素。但我现在的问题是,为什么我们需要 Collectors.toConcurrentMap,他们本可以使用简单的哈希映射然后组合并返回。 - Vip
2
@VipulGoyal 这显然是为了优化目的。合并大型 HashMap 可能非常昂贵,而在实现流时已经有了 ConcurrentHashMap,为什么不直接使用它呢? - Eugene
@Eugene,我同意你的观点,合并HashMap确实很昂贵。但是我现在在想,为什么我们没有更好的并发列表实现,而只有CopyOnWriteArrayList这种相当昂贵的实现。那里面存在什么挑战,或者我漏掉了什么?无论如何,我已经得到了我的答案,所有这些都是不同的讨论。 - Vip
3
如果流(输入)和集合(输出)都是有序的,那么并发集合是无法帮助的,因为值必须按顺序收集。但是,如果不必维护顺序,并且集合是并发的,则所有并行线程都可以添加到单个结果集合中,而不是构建需要合并的中间子结果。 - Andreas
@Vipul Goyal:合并两个 HashMap 意味着重新处理一个映射表的所有条目。相反,合并两个 ArrayList 意味着单个纯内存传输。此外,请记住 Collectors.toList() 不指定返回一个 ArrayList,甚至不是可变列表。因此,将来版本可能返回一个不同的 List 实现,在构建时更容易合并,但之后则不可修改... - Holger

15

但是我没有看到可以以线程安全的方式从并行流中收集提供列表作为输出的选项,这完全是错误的。

流的整个设计思路就是使用非线程安全的集合来实现完全有效的线程安全结果。这是由于流的实现方式(这也是流设计的关键部分)。你可以看到Collector定义了一个supplier方法,在每个步骤中都会创建一个新的实例,这些实例将在彼此之间合并。

因此,下面的代码是完全线程安全的:

 Stream.of(1,2,3,4).parallel()
          .collect(Collectors.toList());

由于此流中有4个元素,因此将创建4个ArrayList实例,并在最后合并为一个结果(假设至少有4个CPU核心)

另一方面,像toConcurrent这样的方法会生成一个单一结果容器,所有线程都将把它们的结果放入其中。


1
假设至少有四个CPU核心。 - Holger
@Holger 我正在努力关注细节,但你远远超过了这个水平... :) 非常感谢您的评论! - Eugene
Stream#collect javadoc中提到的部分是什么意思:“如果流是并行的,并且Collector是并发的,那么将执行并发缩减(有关并发缩减的详细信息,请参见Collector。)” Collectors.toList()创建了一个不是并发的Collector实现。那这是什么意思呢? - Jan Krakora
@Behnil,"this mean" 是什么意思?是指回答还是你在评论中的问题?能否澄清一下? - Eugene
2
默认情况下假设至少有5个CPU核心!Stream默认使用ForkJoinPool和ForkJoinPool.commonPool(),默认大小为Runtime.getRuntime().availableProcessors() - 1。 - herburos
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接