内部并行流是否在考虑并行化外部流之前完全并行处理?

3

从这个链接中,我只部分理解到,至少在某个时刻,Java嵌套并行流存在问题。然而,我无法推断出以下问题的答案:

假设我有一个外层流和一个内层流,两者都使用并行流。根据我的计算,如果内层流完全并行执行,然后(仅当CPU核心可用时)再执行外层流,那么它将更具性能(由于数据本地性,即在L1 / L2 / L3 CPU缓存中缓存)。我认为对于大多数人来说,这是真实的情况。因此,我的问题是:

Java是否会先并行执行内部流,然后再处理外部流?如果是这样,它是在编译时还是运行时做出决定?如果在运行时,JIT是否足够聪明,能意识到如果内部流具有超过足够元素(例如数百个)与核心数(32)相比,则应该在处理内部流之前使用所有32个核心;但是,如果元素数量很小(例如<32),则“也可以同时处理”来自“下一个”外部流的元素。

2
你能展示一个你所说的例子吗?比如flatMap.parallel?或者streamA.... map(streamB.parallel...) - undefined
流并行几乎完全不可配置,它被设计为自动运行。如果您需要优化并行性,我建议根本不要使用流。它们本身就有很多开销。 - undefined
2个回答

7
也许以下示例程序可以解决这个问题:

或许下面的示例程序可以帮助理解:

IntStream.range(0, 10).parallel().mapToObj(i -> "outer "+i)
         .map(outer -> outer+"\t"+IntStream.range(0, 10).parallel()
            .mapToObj(inner -> Thread.currentThread())
            .distinct() // using the identity of the threads
            .map(Thread::getName) // just to be paranoid, as names might not be unique
            .sorted()
            .collect(Collectors.toList()) )
         .collect(Collectors.toList())
         .forEach(System.out::println);

当然,结果可能会有所不同,但在我的计算机上输出看起来类似于这样:
outer 0 [ForkJoinPool.commonPool-worker-6]
outer 1 [ForkJoinPool.commonPool-worker-3]
outer 2 [ForkJoinPool.commonPool-worker-1]
outer 3 [ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-4, ForkJoinPool.commonPool-worker-5]
outer 4 [ForkJoinPool.commonPool-worker-5]
outer 5 [ForkJoinPool.commonPool-worker-2, ForkJoinPool.commonPool-worker-4, ForkJoinPool.commonPool-worker-7, main]
outer 6 [main]
outer 7 [ForkJoinPool.commonPool-worker-4]
outer 8 [ForkJoinPool.commonPool-worker-2]
outer 9 [ForkJoinPool.commonPool-worker-7]

我们可以看到,对于我的计算机来说,有八个核心,七个工作线程正在贡献他们的力量,以利用所有的核心。至于公共池,调用者线程也会参与工作,而不仅仅是等待完成。你可以清楚地在输出中看到主线程。
此外,你可以看到外层流得到了完整的并行性,而一些内层流则完全由单个线程处理。每个工作线程都会为外层流的至少一个元素做出贡献。如果你将外层流的大小减小到核心数,很可能会看到恰好一个工作线程处理一个外层流元素,这意味着所有内层流的完全顺序执行。
但我使用的数字与核心数不匹配,甚至不是它的倍数,以展示另一种行为。由于外层流处理的工作负载不均匀,即一些线程只处理一个项目,其他线程处理两个项目,这些空闲的工作线程执行工作窃取,为剩余的外层元素的内部流处理做出贡献。
这种行为背后有一个简单的理念。当外层流的处理开始时,它并不知道它将成为“外层流”。它只是一个并行流,没有办法找出这是否是一个外层流,除非处理它,直到其中一个函数开始另一个流操作。但是,在此之前推迟并行处理没有任何意义,因为可能永远不会到达这一点。
除此之外,我强烈反对你的假设“如果内部流首先完全并行执行,那么性能会更好”。我更倾向于期望实现的方式正好与现在的实现方式相同,适用于典型的用例。但是,如前一段所述,没有合理的方法来实现优先处理内部流的偏好。

我在考虑,首先并行处理内部流可能会更高效,因为我的内部流通常是集合,其中包含一些“大型”哈希映射,它刚好适应CPU的L3缓存。因此,如果内部流都并行操作,那么这个“大型”哈希映射将适应L3缓存,并且所有核心都可以访问它(并行)。但是,如果内部流按顺序操作(因为外部流是并行化的),那么每个内部流线程将竞争缓存大小的1/32(在32核机器上)。你有什么想法? - undefined
你有什么想法,@Holger? - undefined
一个 HashMap 不是一个内存块。HashMap、其支持数组、每个 Entry 实例、引用的键对象和值对象都是不同的对象,它们不一定相邻在内存中,甚至不能保证位于同一区域。试图将它们全部装入 L3 缓存中并没有任何优势。而且,每个线程最终仍然会使用¹/₃₂的 L3 缓存,无论它属于同一个 HashMap 还是不同的 HashMap,因为并行处理的基本原则是每个线程处理不同的数据块。 - undefined

1
根据我刚编写的小测试,答案是“否”(关于“Java是否会先并行执行内部流,然后再处理外部流”的问题)。请注意,“默认情况下,在我的机器上”会使用4个线程进行流操作。
    List<Integer> first = List.of(1, 2, 3, 4);
    List<Integer> second = List.of(5, 6, 7, 8);

    first.stream().parallel()
            .peek(x -> {
                System.out.println("first : " + x + " " + Thread.currentThread().getName());
            })
            .map(x -> second.stream().parallel().peek(y -> {

                System.out.println("second : " + y + " " + Thread.currentThread().getName());

            }).collect(Collectors.toList()))
            .filter(x -> true)
            .collect(Collectors.toList());

你可以从输出中看到内部流不是首先执行的。你可以增加每个流中的元素数量以获得更准确的输出(交错“first”和“second”-不知道是否是正确的术语)。
但是还有一件事让我感到困惑...上面的示例如何没有阻塞我无法理解。只有4个线程和4个元素,所有线程都在等待内部流进行处理;但是 ForkJoinPool 没有可用的线程来接管-那么它是如何工作的呢? 你提供的链接(@Holger's answer)说,将创建比实际请求的线程数更多的线程。但是它们的名称在输出中缺失...

2
似乎您误解了我的意思。如果线程被知道被阻塞,F/J会创建补偿线程。然而,在Stream API的情况下,调用者线程不会被阻塞,但会参与处理。早期的Java 8版本似乎在这个工作窃取方面存在问题,但这个问题似乎已经得到改善。当在工作线程中等待CompletableFuture时,您仍然可以看到补偿线程的效果。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接