Java 8中forEachOrdered()和sequential()方法的区别是什么?

5

我正在处理 Java 8 并行流,希望以某种顺序(例如插入顺序、反向顺序或顺序顺序)打印并行流中的元素。

为此,我尝试了以下代码:

        System.out.println("With forEachOrdered:");
        listOfIntegers
            .parallelStream()
            .forEachOrdered(e -> System.out.print(e + " "));
        System.out.println("");

        System.out.println("With Sequential:");
        listOfIntegers.parallelStream()
                    .sequential()
                    .forEach(e -> System.out.print(e + " "));

对于这两个问题,我得到了相同的输出结果,如下所示:

With forEachOrdered:
1 2 3 4 5 6 7 8 
With Sequential:
1 2 3 4 5 6 7 8 

从API文档中,我可以看到:

forEachOrdered -> 这是一个终端操作。

以及

sequential -> 这是一个中间操作。

那么我的问题是哪个更好使用?在什么情况下,应该优先选择其中之一?

3个回答

8

listOfIntegers.parallelStream().sequential().forEach() 创建了一个并行流并将其转换为顺序流。因此,您可以直接使用 listOfIntegers.stream().forEach() 以便在一开始就获得顺序流。

listOfIntegers.parallelStream().forEachOrdered(e -> System.out.print(e + " ")) 在并行流上执行操作,但保证按照流的遇到顺序(如果流具有定义的遇到顺序)消耗元素。然而,它可以在多个线程上执行。

我看不出使用 listOfIntegers.parallelStream().sequential() 的任何理由。如果您想要一个顺序流,为什么要先创建一个并行流呢?


因为您可能希望并行执行一些重型操作,然后重新排序流以进行最终输出。 - Adrian Shum
@AdrianShum 我不确定这种方法在性能方面是否有效,因为流是惰性求值的。它不会并行执行所有元素的第一步中间操作,然后再顺序执行其余的中间操作和终端操作。只有当你到达终端操作时,它才开始对单个元素执行中间操作,直到它们成为终端操作的输入。 - Eran
这是正确的。也许只有在我编写一些工具来处理提供的流并且希望确保它是顺序的时,它才有用,即使用户提供了并行流? - Adrian Shum
4
list.parallelStream().sorted().forEachOrdered(…) 视为一种实际的示例,它可以并行进行大量工作,但是终端操作需要有序。 - Holger
4
没问题。这是一个操作的示例,其中使用.parallelStream(). … .forEachOrdered(…).parallelStream(). … sequential().forEach(…)会产生明显的差异。正如你所说,“流是惰性求值的”,因此如果终端操作由于排序约束而阻塞了工作线程,则除非你有像sorted()这样的有状态中间操作,否则并没有太多并行性。 - Holger
显示剩余2条评论

4
你提出了一个有些误导性的问题,首先你问到:
 .parallelStream()
 .forEachOrdered(...)

这将创建一个并行流,但元素将按顺序被消耗。如果您添加像这样的map操作:

.map(...)
.parallelStream()
.forEachOrdered(...)

这将使得map操作非常有限(从并行处理的角度来看),因为线程必须等待所有其他元素按照遇到的顺序被处理(被forEachOrdered消费)。这涉及无状态操作。

另一方面,如果您有一个有状态的操作,例如:

.parallelStream()
.map()
.sorted()
.// other operations

由于sorted是有状态的,因此在并行处理之前进行无状态操作比它更大的好处。这是因为sorted必须从流中收集所有元素,而线程不必在遇到元素的顺序时“等待”(在forEachOrdered处)。

对于第二个示例:

listOfIntegers.parallelStream()
                .sequential()
                .forEach(e -> System.out.print(e + " "))

你的意思是打开并且关闭并行化。数据流是由终端操作驱动的,所以即使你这样做:

 .map...
 .filter...
 .parallel()
 .map...
 .sequential

这意味着整个流水线将按顺序执行,而不是部分并行和部分顺序。您还依赖于 forEach 保留顺序,并且可能目前确实如此,但在以后的版本中可能会发生变化,因为您一开始使用 forEach 表示您不关心顺序,所以元素将进行内部洗牌。


2

流水线可以顺序执行或并行执行。这种执行模式是流的属性。流是带有初始顺序或并行执行选择的。例如,Collection.stream()创建一个顺序流,而Collection.parallelStream()创建一个并行流。选择执行模式可以通过BaseStream.sequential()BaseStream.parallel()方法修改。

因此,无需使用:

listOfIntegers.parallelStream().sequential()

您只能使用:

listOfIntegers.stream()

如果您正在创建一个并行流,那么流的元素可能会被不同的线程处理。forEach和forEachOrdered之间的区别在于,forEach允许以任何顺序处理并行流的任何元素,而forEachOrdered始终按照它们在原始流中出现的顺序处理并行流的元素。当使用parallelStream()和forEachOrdered时,可以很好地利用多个核心并仍然保留输出的顺序。请注意,forEachOrdered强制迭代流元素的顺序。但是,在forEachOrdered之前链接的任何操作仍将以并行方式发生,因为流是并行流。
Oracle没有确切地记录在管道中多次更改流执行模式会发生什么。不清楚最后一次更改是否重要或者在调用parallel()后调用的操作是否可以并行执行,而在调用sequential()后调用的操作将按顺序执行。

3
在流水线中多次更改流执行模式会发生什么事情并没有确切的记录。我不同意这个说法,因为在 Stream 类的文档中已经明确说明:"流水线可以顺序执行或并行执行"。 - Anlon Burke
@AnlonBurke 请仔细阅读Oracle未记录的最终内容。 - Alex Mamo
4
整个管道要么按顺序执行,要么并行执行。在我看来,这就是 Javadoc 中这句话的意思。不可能有一部分是“并行”的,而另一部分是“顺序”的(事实上,在流 API 最初开发的某个时期尝试过这种方式,但因为太复杂而被放弃了)。我不明白你对此的疑虑从何而来。 - Sartorius

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接