流和并行流

7

I have a test code like this :

List<Integer> list = new ArrayList<>(1000000);

for(int i=0;i<1000000;i++){
    list.add(i);
}

List<String> values = new ArrayList<>(1000000);

list.stream().forEach(
    i->values.add(new Date().toString())
);

System.out.println(values.size()); 

运行该代码,得到正确输出:1000000。

然而,如果我将 stream() 改成 parallelStream(), 如下所示:

 list.parallelStream().forEach(
    i->values.add(new Date().toString())
 );

我得到了一个随机输出,例如:920821。
这是怎么回事?

8
你在 ArrayList 上的 add 操作不具备线程安全性! - SMA
parallelStream()使用ForkJoinPool并行运行流,使用向量而不是ArrayList。 - PKR
2
@PKR 使用Vector的缺点是可能比使用一个线程要慢。 - Peter Lawrey
3个回答

13

ArrayList不是同步的。尝试同时向其添加元素是没有定义的。根据forEach

对于并行流管道,此操作不能保证遵守流的相遇顺序,因为这样做将牺牲并行性的好处。对于任何给定的元素,动作可以在库选择的任何时间和任何线程中执行

在您的第二个示例中,您最终会有多个线程同时调用数组列表上的add方法,并且ArrayList文档说:

请注意,此实现未经同步。如果多个线程并发访问ArrayList实例,并且其中至少一个线程在结构上修改了列表,则必须在外部进行同步。

错误的解决方案

如果您将ArrayList的使用更改为Vector,则会得到正确的结果,因为此列表实现已同步。它的Javadoc说:

与新的集合实现不同,Vector是同步的。

然而,请不要使用它!此外,由于显式同步,它可能会变得更慢。

正确的方法

正是为了避免这种情况,Stream API提供了可变减少范例,使用collect方法。以下是

List<String> values = list.stream().map(i -> "foo").collect(Collectors.toList());

无论是否并行运行,Stream管道都将始终提供正确的结果。流管道在内部处理并发,并保证在并行流的收集操作中使用非并发收集器是安全的Collectors.toList() 是一个内置收集器,将 Stream 的元素累积到一个列表中。


3
使用Vector可以确保元素数量正确,但不能保证在使用forEach时元素顺序正确。另一方面,如果使用forEachOrdered来解决这个问题,再次使用ArrayList就可以了,但是在大多数情况下,性能仍然不如顺序流... - Holger

5

使用Consumer时,需要注意线程安全问题。更简单的方法是让Stream API累加结果。

List<String> values = IntStream.range(0, 1_000_000).parallel()
                               .mapToObj(i -> new Date().toString())
                               .collect(Collectors.toList());

避免使用像Vector这样的线程安全集合的一个主要原因是它需要每个线程获取一个共享锁,这会成为瓶颈,即你将花费时间来获取和释放锁,而且每次只能有一个线程可以访问它。你很容易得到一个比单线程还慢的解决方案。

3

values.add(String) 不是线程安全的。如果您在没有同步的情况下从不同的线程调用此方法,则不能保证它能按预期工作。

要解决这个问题,您可以:

  • 使用线程安全集合,如VectorCopyOnWriteArrayList
  • 明确同步您的代码。例如,将synchronize(this){values.add(new Date().toString())}放入您的代码中。请注意,i->在同步块之外。
  • 或者在这种情况下,映射元素以获取新流,就像@PeterLawrey的答案中所示:IntStream.range(0, 1_000_000).parallel().mapToObj(i -> new Date().toString()).collect(Collectors.toList());

仅供参考,CopyOnWriteArrayList 不适用于此类大型集合。 - vsnyc
1
我想表达的是,使用100万条目完成该操作需要很长时间。 - vsnyc

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接