也许以下示例程序可以解决这个问题:
或许下面的示例程序可以帮助理解:
IntStream.range(0, 10).parallel().mapToObj(i -> "outer "+i)
.map(outer -> outer+"\t"+IntStream.range(0, 10).parallel()
.mapToObj(inner -> Thread.currentThread())
.distinct()
.map(Thread::getName)
.sorted()
.collect(Collectors.toList()) )
.collect(Collectors.toList())
.forEach(System.out::println);
当然,结果可能会有所不同,但在我的计算机上输出看起来类似于这样:
outer 0 [ForkJoinPool.commonPool-worker-6]
outer 1 [ForkJoinPool.commonPool-worker-3]
outer 2 [ForkJoinPool.commonPool-worker-1]
outer 3 [ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-4, ForkJoinPool.commonPool-worker-5]
outer 4 [ForkJoinPool.commonPool-worker-5]
outer 5 [ForkJoinPool.commonPool-worker-2, ForkJoinPool.commonPool-worker-4, ForkJoinPool.commonPool-worker-7, main]
outer 6 [main]
outer 7 [ForkJoinPool.commonPool-worker-4]
outer 8 [ForkJoinPool.commonPool-worker-2]
outer 9 [ForkJoinPool.commonPool-worker-7]
我们可以看到,对于我的计算机来说,有八个核心,七个工作线程正在贡献他们的力量,以利用所有的核心。至于公共池,调用者线程也会参与工作,而不仅仅是等待完成。你可以清楚地在输出中看到主线程。
此外,你可以看到外层流得到了完整的并行性,而一些内层流则完全由单个线程处理。每个工作线程都会为外层流的至少一个元素做出贡献。如果你将外层流的大小减小到核心数,很可能会看到恰好一个工作线程处理一个外层流元素,这意味着所有内层流的完全顺序执行。
但我使用的数字与核心数不匹配,甚至不是它的倍数,以展示另一种行为。由于外层流处理的工作负载不均匀,即一些线程只处理一个项目,其他线程处理两个项目,这些空闲的工作线程执行工作窃取,为剩余的外层元素的内部流处理做出贡献。
这种行为背后有一个简单的理念。当外层流的处理开始时,它并不知道它将成为“外层流”。它只是一个并行流,没有办法找出这是否是一个外层流,除非处理它,直到其中一个函数开始另一个流操作。但是,在此之前推迟并行处理没有任何意义,因为可能永远不会到达这一点。
除此之外,我强烈反对你的假设“如果内部流首先完全并行执行,那么性能会更好”。我更倾向于期望实现的方式正好与现在的实现方式相同,适用于典型的用例。但是,如前一段所述,没有合理的方法来实现优先处理内部流的偏好。
flatMap.parallel?
或者streamA.... map(streamB.parallel...)
。 - undefined