并行流与流的行为不同

14

我不明白为什么并行流和流在执行相同语句时会产生不同的结果。

    List<String> list = Arrays.asList("1", "2", "3");
    String resultParallel = list.parallelStream().collect(StringBuilder::new,
            (response, element) -> response.append(" ").append(element),
            (response1, response2) -> response1.append(",").append(response2.toString()))
            .toString();
    System.out.println("ResultParallel: " + resultParallel);

    String result = list.stream().collect(StringBuilder::new,
            (response, element) -> response.append(" ").append(element),
            (response1, response2) -> response1.append(",").append(response2.toString()))
            .toString();

    System.out.println("Result: " + result);

结果并行:1、2、3

结果:1 2 3

有人可以解释一下为什么会出现这种情况,以及我如何让非并行版本产生与并行版本相同的结果吗?


在并行流中,您可以使用逗号(,)将两个响应连接起来,但在非并行流的情况下,它总是响应和元素,因为它始终获取单个元素。 - random_user
1
顺便提一下,可以使用 Collectors.joining 来实现相同的功能。 - Eugene
3个回答

12

Java 8的Stream.collect方法具有以下签名:

<R> R collect(Supplier<R> supplier,
              BiConsumer<R, ? super T> accumulator,
              BiConsumer<R, R> combiner);

BiConsumer<R, R> combiner仅在并行流中调用时(为了将部分结果组合成单个容器),因此您的第一段代码片段的输出为:

ResultParallel: 1, 2, 3

在顺序版本中,不会调用combiner(请参见此answer),因此将忽略以下语句:
(response1, response2) -> response1.append(",").append(response2.toString())

结果不同:

1 2 3

如何修复?请查看@Eugene的答案或者这个问题和答案


1
不是我不相信您,但是否有文件或文档可以解释这个问题? - Slaw
3
这是来自 Stuart Marks 的一个 答案 - Oleksandr Pyrohov
1
@Slaw - 请看我的回答。Javadoc提供了一些提示,尽管它并没有直接解释清楚。 - Stephen C

8
为了理解为什么出现错误,可以参考javadoc中的内容。其中,accumulator是一个无状态函数,它将元素合并到结果容器中;combiner是另一个无状态函数,它接受两个部分结果容器并将它们合并在一起,必须与累加器函数兼容。这意味着使用“累加”或“组合”或两者的组合收集元素都没有关系。但是,在您的代码中,累加器和组合器使用不同的分隔符进行连接。从而导致根据使用顺序流还是并行流会得到不一致的结果。
具体来说:
  • 当使用并行流时,流被分成子流以由不同线程处理。这导致每个子流有一个单独的集合。然后将集合合并。
  • 当使用顺序流时,流不会被分割。相反,流被简单地累加到单个集合中,不需要进行任何组合。

观察:
  • In general, for a stream of this size performing a simple transformation, parallelStream() is liable to make things slower.

  • In this specific case, the bottleneck with the parallelStream() version will be the combining step. That is a serial step, and it performs the same amount of copying as the entire serial pipeline. So, in fact, parallelization is definitely going to make things slower.

  • In fact, the lambdas do not behave correctly. They add an extra space at the start, and double some spaces if the combiner is used. A more correct version would be:

    String result = list.stream().collect(
        StringBuilder::new,
        (b, e) -> b.append(b.isEmpty() ? "" : " ").append(e),
        (l, r) -> l.append(l.isEmpty() ? "" : " ").append(r)).toString();
    
  • The Joiner class is a far simpler and more efficient way to concatenate streams. (Credit: @Eugene)


1 - 在这种情况下,子流每个只有一个元素。对于更长的列表,通常会获得与工作线程数量相同的子流,并且子流将包含多个元素。


1
我想在观察部分补充说明,整个代码并不是真正必需的,可以使用Collectors.joining - Eugene
是的。有一种很大的诱惑去尝试使用Java 8流来做“聪明的事情”...只会发现最终结果更慢且更难理解。但说实话,我不明白OP在这里想要实现什么。(为什么让累加器和组合器不兼容?你期望会发生什么?) - Stephen C
我通常尝试尽可能简单地完成事情,但由于我对流和收集器API的完全了解还不够,因为我仍在学习中,所以我可能使这个过程比必要的更加复杂。使用流,如何在列表值之间添加逗号并将结果作为字符串返回是最好的方法? - Claudiga

7
作为旁注,即使您在“combiner”中将 , 替换为空格,您的结果仍将有所不同(稍微更改了代码以使其更易读):
String resultParallel = list.parallelStream().collect(
            StringBuilder::new,
            (builder, elem) -> builder.append(" ").append(elem),
            (left, right) -> left.append(" ").append(right)).toString();

    String result = list.stream().collect(
            StringBuilder::new,
            (builder, elem) -> builder.append(" ").append(elem),
            (left, right) -> left.append(" ").append(right)).toString();


  System.out.println("ResultParallel: ->" + resultParallel + "<-"); // -> 1  2  3  4<-
  System.out.println("Result: ->" + result + "<-"); // -> 1 2 3 4<-

请注意您有一些过多的空格。
Java文档中有提示:
组合器(combiner)必须与累加器函数兼容。
如果您想要连接,还有更简单的选项:
String.join(",", yourList)
yourList.stream().collect(Collectors.joining(","))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接