并行流中的流操作会等待前一个流操作处理完所有元素吗?

3

我想知道并行流中一个流水线的流操作是否等待前一个流操作完成对所有流元素的处理。

如果我有以下流水线:

List <String> parsedNumbers = IntStream.range(1, 6)
    .parallel()
    .map(String::valueOf)
    .map(integerAsString => {
        System.out.println("First print statement: " + integerAsString);
        return integerAsString;
    })
    .map(integerAsString => {
        System.out.println("Second print statement: " + integerAsString);
        return integerAsString;
    })
    .collect(Collectors.toList());

在流中对元素进行操作时,是否可能正在为元素X调用System.out.println("First print statement: " + integerAsString),而仍在为另一个元素Y执行String::parseInt操作?

这段代码的输出如下:

第一行打印语句:1
第一行打印语句:2
第一行打印语句:3
第二行打印语句:1
第二行打印语句:2
第一行打印语句:4
第二行打印语句:3
第二行打印语句:4
第一行打印语句:5
第二行打印语句:5

还是总是像这样:

第一行打印语句:1
第一行打印语句:2
第一行打印语句:3
第一行打印语句:4
第一行打印语句:5
第二行打印语句:1
第二行打印语句:2
第二行打印语句:3
第二行打印语句:4
第二行打印语句:5


1
为什么这很重要?Stream 应该没有副作用,所以我们不应该关心它。 - Turing85
副作用是不被鼓励的,但并非禁止。然而我同意这一点。只是有一天我在想顺序的问题。 - Titulum
这些操作在顺序流中甚至不以那种方式工作。此外,.map(String::parseInt) 没有意义。将 int 转换为 String 不是解析,因此不存在这样的方法。 - Holger
parseInt 确实不存在,是我的错误。我已经更新了代码以使用正确的方法。 - Titulum
2个回答

4

是的,它可以。Intermediate阶段可以以任何顺序执行,terminal操作有一个定义好的顺序,如果流的源具有顺序(不像Set),并且流本身不改变该顺序(调用unordered - 尽管目前这没什么用)。

也就是说,在给定的时间点上,您真的不知道一个阶段中的元素将流经哪个阶段,对于并行流,没有处理元素的顺序。

更大的问题是:为什么您要关心呢?中间操作应该是无副作用的,并依赖于任何顺序都是一个坏主意。


副作用是不被鼓励的,但并非被禁止。我同意这一点。只是有一天我在想顺序的问题。 - Titulum
@Titulum 没错,我知道的最著名的副作用就是 _distinctBy_。 - Eugene

4

处理顺序无法保证,即使是顺序流也是如此。仅当数据具有顺序时,最终结果才会与遇到的顺序一致。

当您运行以下顺序代码时:

List<String> parsedNumbers = IntStream.range(1, 6)
    .mapToObj(String::valueOf)
    .map(integerAsString -> {
        System.out.println("First print statement: " + integerAsString);
        return integerAsString;
    })
    .map(integerAsString -> {
        System.out.println("Second print statement: " + integerAsString);
        return integerAsString;
    })
    .collect(Collectors.toList());

它将会打印

First print statement: 1
Second print statement: 1
First print statement: 2
Second print statement: 2
First print statement: 3
Second print statement: 3
First print statement: 4
Second print statement: 4
First print statement: 5
Second print statement: 5

展现了流不像你期望的那样工作。参考实现有明显的偏好,即在处理下一个元素之前将每个元素通过整个流传递。当您启用并行处理时,相同的处理逻辑将在每个 CPU 核心上执行。

因此,当我使用

List<String> parsedNumbers = IntStream.range(1, 6)
    .parallel()
    .mapToObj(String::valueOf)
    .map(integerAsString -> {
        System.out.println("First print statement: " + integerAsString);
        return integerAsString;
    })
    .map(integerAsString -> {
        System.out.println("Second print statement: " + integerAsString);
        return integerAsString;
    })
    .collect(Collectors.toList());

我在我的计算机上看到了类似这样的东西:

First print statement: 5
First print statement: 2
First print statement: 1
First print statement: 4
First print statement: 3
Second print statement: 5
Second print statement: 2
Second print statement: 1
Second print statement: 4
Second print statement: 3

这可能看起来像先处理第一个打印语句,然后再处理第二个,但那只是因为CPU核心比流元素多,并且时机恰好而已。例如,当我将range(1,6)更改为range(1, 18)时,我会得到类似于以下的结果:

First print statement: 6
First print statement: 10
First print statement: 9
First print statement: 3
First print statement: 15
First print statement: 5
Second print statement: 9
First print statement: 11
First print statement: 8
Second print statement: 3
Second print statement: 11
Second print statement: 5
Second print statement: 10
Second print statement: 6
First print statement: 7
First print statement: 12
Second print statement: 8
Second print statement: 15
Second print statement: 12
Second print statement: 7
First print statement: 2
First print statement: 17
First print statement: 14
First print statement: 4
Second print statement: 14
Second print statement: 17
Second print statement: 2
First print statement: 1
First print statement: 16
First print statement: 13
Second print statement: 16
Second print statement: 1
Second print statement: 4
Second print statement: 13

不仅没有关于处理顺序的保证,也没有关于将处理哪些元素的保证,例如:

IntStream.range(1, 30)
    .filter(i -> i%13 == 1)
    .peek(i -> System.out.println("processing "+i))
    .parallel()
    .findFirst()
    .ifPresent(i -> System.out.println("result is "+i));

我的设置中产生的

processing 14
processing 1
processing 27
result is 1

因此,虽然结果保证为1,即按照遇到的顺序第一个匹配的元素,但不能保证按照它之后的遇到顺序不会处理其他元素。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接