并行的flatMap始终是顺序执行的。

15

假设我有这段代码:

 Collections.singletonList(10)
            .parallelStream() // .stream() - nothing changes
            .flatMap(x -> Stream.iterate(0, i -> i + 1)
                    .limit(x)
                    .parallel()
                    .peek(m -> {
                        System.out.println(Thread.currentThread().getName());
                    }))
            .collect(Collectors.toSet());

输出的是相同的线程名称,因此在这里使用parallel没有任何好处 - 我的意思是有一个单独的线程完成所有工作。

flatMap中有如下代码:

result.sequential().forEach(downstream);

如果“外部”流是并行的(它们可能会阻塞),则理解强制使用sequential属性,“外部”必须等待“flatMap”完成,反之亦然(因为使用相同的公共池)。但为什么总是强制这样做呢?

这是不是那种在以后的版本中可以改变的事情之一?

2个回答

16

有两个不同的方面。

首先,只有一个单一的流水线,可以是顺序的或并行的。在内部流中选择顺序或并行是无关紧要的。请注意,引用代码片段中看到的 downstream 消费者代表整个后续流水线,因此在您的代码中,以 .collect(Collectors.toSet()); 结尾,此消费者最终将元素添加到一个不安全的单一 Set 实例中。因此,使用该单个消费者并行处理内部流会破坏整个操作。

如果外部流被分割,引用的代码可能会与不同的消费者同时调用,这些消费者将添加到不同的集合中。每个这样的调用都会处理映射到不同内部流实例的外部流的不同元素。由于您的外部流仅包含一个元素,因此无法拆分。

这种实现方式也是 Java 流中为什么 flatMap() 后面的 filter() 不完全懒惰? 问题的原因,因为 forEach 在内部流上调用,该内部流将所有元素传递给下游消费者。正如 此答案 所示,另一种支持惰性和子流分割的实现是可能的。但这是一种根本不同的实现方式。流实现的当前设计主要通过消费者组合来工作,因此最终源拆分器(以及从它拆分出的拆分器)在 tryAdvanceforEachRemaining 中接收代表整个流水线的Consumer。相比之下,链接答案的解决方案使用拆分器组合,生成一个新的委托给源拆分器的Spliterator。我想,两种方法都有优点,我不确定 OpenJDK 实现在采用另一种方式时会失去多少。


1
@holi-java 我不会说这是一个 bug,只是实现设计较差,很可能在未来得到修复。 - Jacob G.
6
@holi-java: 缺少惰性可能被视为一个缺陷,已经有一份相应的缺陷报告。然而,有限制的并行性只是潜在性能提升的一个领域。在实践中,这仅会影响到外部流元素数量较小、内部流元素数量较大的流。 - Holger
1
@Holger 我目前并不是在寻找解决方案 - 只是出于纯粹的兴趣。像往常一样,从你这里得到了非常有趣的阅读体验。 - Eugene
1
@DmytroBuryak,您可以尝试链接答案中的解决方案。 - Holger
2
@DmytroBuryak 我仍然希望有一个内置的解决方案... - Holger
显示剩余5条评论

4

对于像我这样迫切需要并行化flatMap的人,需要一些实际解决方案,而不仅仅是历史和理论。

我想到的最简单的解决方案是手动展开,基本上是通过将其替换为 map + reduce(Stream::concat) 来实现。

以下示例演示了如何执行此操作:

@Test
void testParallelStream_NOT_WORKING() throws InterruptedException, ExecutionException {
    new ForkJoinPool(10).submit(() -> {
        Stream.iterate(0, i -> i + 1).limit(2)
                .parallel()

                // does not parallelize nested streams
                .flatMap(i -> generateRangeParallel(i, 100))

                .peek(i -> System.out.println(currentThread().getName() + " : generated value: i=" + i))
                .forEachOrdered(i -> System.out.println(currentThread().getName() + " : received value: i=" + i));
    }).get();
    System.out.println("done");
}

@Test
void testParallelStream_WORKING() throws InterruptedException, ExecutionException {
    new ForkJoinPool(10).submit(() -> {
        Stream.iterate(0, i -> i + 1).limit(2)
                .parallel()

                // concatenation of nested streams instead of flatMap, parallelizes ALL the items
                .map(i -> generateRangeParallel(i, 100))
                .reduce(Stream::concat).orElse(Stream.empty())

                .peek(i -> System.out.println(currentThread().getName() + " : generated value: i=" + i))
                .forEachOrdered(i -> System.out.println(currentThread().getName() + " : received value: i=" + i));
    }).get();
    System.out.println("done");
}

Stream<Integer> generateRangeParallel(int start, int num) {
    return Stream.iterate(start, i -> i + 1).limit(num).parallel();
}

// run this method with produced output to see how work was distributed
void countThreads(String strOut) {
    var res = Arrays.stream(strOut.split("\n"))
            .map(line -> line.split("\\s+"))
            .collect(Collectors.groupingBy(s -> s[0], Collectors.counting()));
    System.out.println(res);
    System.out.println("threads  : " + res.keySet().size());
    System.out.println("work     : " + res.values());
}

在我的计算机上运行的统计数据:

NOT_WORKING case stats:
{ForkJoinPool-1-worker-23=100, ForkJoinPool-1-worker-5=300}
threads  : 2
work     : [100, 300]

WORKING case stats:
{ForkJoinPool-1-worker-9=16, ForkJoinPool-1-worker-23=20, ForkJoinPool-1-worker-21=36, ForkJoinPool-1-worker-31=17, ForkJoinPool-1-worker-27=177, ForkJoinPool-1-worker-13=17, ForkJoinPool-1-worker-5=21, ForkJoinPool-1-worker-19=8, ForkJoinPool-1-worker-17=21, ForkJoinPool-1-worker-3=67}
threads  : 10
work     : [16, 20, 36, 17, 177, 17, 21, 8, 21, 67]

使用Stream::concat(否则为空的流)进行reduce操作效果非常好 :) - Avi
.reduce(Stream::concat)难道不就相当于将结果收集到一个LinkedList中使用.collect()吗?” - Jack Edmonds
非常感谢。在找到这个之前,我一直很痛苦地试图弄清楚我的并行性出了什么问题。 - beirtipol

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接