Java 8与11中ForkJoinPool的性能比较

5

Consider the following piece of code:

package com.sarvagya;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;

public class Streamer {
    private static final int LOOP_COUNT  = 2000;
    public static void main(String[] args){
        try{
            for(int i = 0; i < LOOP_COUNT; ++i){
                poolRunner();
                System.out.println("done loop " + i);
                try{
                    Thread.sleep(50L);
                }
                catch (Exception e){
                    System.out.println(e);
                }
            }
        }
        catch (ExecutionException | InterruptedException e){
            System.out.println(e);
        }

        // Add a delay outside the loop to make sure all daemon threads are cleared before main exits.
        try{
            Thread.sleep(10 * 60 * 1000L);
        }
        catch (Exception e){
            System.out.println(e);
        }
    }

    /**
     * poolRunner method.
     * Assume I don't have any control over this method e.g. done by some library.
     * @throws InterruptedException
     * @throws ExecutionException
     */
    private static void poolRunner() throws InterruptedException, ExecutionException {
        ForkJoinPool pool = new ForkJoinPool();
        pool.submit(() ->{
            List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10, 11,12,14,15,16);
            List<Integer> collect = numbers.stream()
                    .parallel()
                    .filter(xx -> xx > 5)
                    .collect(Collectors.toList());
            System.out.println(collect);
        }).get();
    }
}

在上面的代码中,poolRunner方法创建了一个ForkJoinPool并向其中提交一些任务。当使用Java 8并将LOOP_COUNT设置为2000时,我们可以看到最大线程数约为3600,如下所示: Profiling info 图:性能分析

Max Threads in JDK 8 图:线程信息。

这些线程在一段时间后会降至几乎为10。然而,在OpenJDK 11中保持相同的LOOP_COUNT将产生以下错误:
[28.822s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 4k, detached.
[28.822s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 4k, detached.
[28.822s][warning][os,thread] Failed to start thread - pthread_create failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 4k, detached.
Exception in thread "ForkJoinPool-509-worker-5" java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
    at java.base/java.lang.Thread.start0(Native Method)
    at java.base/java.lang.Thread.start(Thread.java:803)
    at java.base/java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1329)
    at java.base/java.util.concurrent.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1352)
    at java.base/java.util.concurrent.ForkJoinPool.signalWork(ForkJoinPool.java:1476)
    at java.base/java.util.concurrent.ForkJoinPool.deregisterWorker(ForkJoinPool.java:1458)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:187)

很快就达到了最大线程限制。将LOOP_COUNT保持在500,工作正常,但是这些线程清除得非常缓慢,并且达到了约500个线程的平台。请参见下面的图像:

OpenJDK 11中的线程信息 图:OpenJDK 11中的线程信息

Profile Info 图:OpenJDK 11中的性能分析

在JDK 8中,线程被PARKED,但在JDK 11中,则为WAIT。Java 11中守护线程的数量也应该减少,但它很慢并且不能按预期工作。此外,假设我对poolRunner方法没有控制权。考虑到这个方法是由某个外部库提供的。
这个问题是OpenJDK 11的问题还是我的代码有问题。谢谢。

除了JDK版本之外,您确定测试条件完全相同吗?我看到两个分析快照中的最大堆不同,应用程序名称似乎也不同。 - Adonis
是的,同样的代码正在JDK11中进行检查。为了测试这段代码,IntelliJ配置已经更改。 - Mahadeva
其他设置呢?硬件相同,VM给定的参数相同吗?GC也一样吗?...等等 - Adonis
2个回答

9

你的代码创建了大量的ForkJoinPool实例,并且在使用完毕后从未调用任何池的shutdown()方法。由于在Java 8中,规范中没有保证工作线程会终止,因此这段代码甚至可能最终拥有2000(⟨池的数量⟩)倍于⟨核心数⟩的线程。

实际上,观察到的行为源于一个两秒的未经记录的空闲超时。请注意,根据评论所述,超时时间到期后的结果是试图缩小工作者数量,这与简单地终止不同。因此,如果n个线程遇到超时,不是所有n个线程都会终止,而是线程数减少一个,并且剩余线程可能会再次等待。此外,“初始超时值”这个短语已经暗示了这一点,每次发生超时时,实际超时时间都会增加。因此,对于n个空闲工作者线程来说,由于这个(未经记录的)超时,需要n * (n + 1)秒才能终止。

从Java 9开始,可以在ForkJoinPoolnew constructor中指定可配置的keepAliveTime,该构造函数还记录了默认值:

keepAliveTime
自上次使用以来经过的时间,然后终止线程(如果需要,则稍后替换)。对于默认值,请使用60, TimeUnit.SECONDS

这份文档可能会让人误以为现在所有的工作线程都可以在空闲keepAliveTime后一起终止,但实际上,仍然只有一个线程池在逐个缩小,尽管现在时间不再增加。因此,现在需要60 * n秒才能终止n个空闲工作线程。由于先前的行为未指定,甚至不兼容。

必须强调的是,即使具有相同的超时行为,结果的最大线程数也可能会发生变化,因为当具有更好代码优化的新JVM减少实际操作的执行时间(没有人为插入Thread.sleep(…))时,它将更快地创建新线程,而终止仍然受到挂钟时间的限制。


重点是当你知道线程池不再需要时,不应依赖于自动工作线程终止。相反,完成后应调用shutdown()
您可以使用以下代码验证行为:
int threadNumber = 8;
ForkJoinPool pool = new ForkJoinPool(threadNumber);
// force the creation of all worker threads
pool.invokeAll(Collections.nCopies(threadNumber*2, () -> {
    Thread.sleep(500);
    return "";
}));
int oldNum = pool.getPoolSize();
System.out.println(oldNum+" threads; waiting for dying threads");
long t0 = System.nanoTime();
while(oldNum > 0) {
    while(pool.getPoolSize()==oldNum)
        LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(200));
    long t1 = System.nanoTime();
    oldNum = pool.getPoolSize();
    System.out.println(threadNumber-oldNum+" threads terminated after "
        +TimeUnit.NANOSECONDS.toSeconds(t1 - t0)+"s");
}

####Java 8:

8 threads; waiting for dying threads
1 threads terminated after 2s
2 threads terminated after 6s
3 threads terminated after 12s
4 threads terminated after 20s
5 threads terminated after 30s
6 threads terminated after 42s
7 threads terminated after 56s
8 threads terminated after 72s

####Java 11:

8 threads; waiting for dying threads
1 threads terminated after 60s
2 threads terminated after 120s
3 threads terminated after 180s
4 threads terminated after 240s
5 threads terminated after 300s
6 threads terminated after 360s
7 threads terminated after 420s

似乎永远不会完成,至少还有一个工作线程保持活动状态。


Holger,感谢您的回答。我看到在池上显式调用shutdown()确实可以提高性能,但这种更改不在我的控制范围内。看起来这个更改需要由库作者完成。 - Mahadeva

7
您的操作有误。
在上述代码中,我创建了一个ForkJoinPool并将一些任务提交给它。
实际上,您正在创建2000个ForkJoinPool实例...
相反,您应该创建一个适合当前任务的并行度(即线程数)的单个ForkJoinPool。
创建大量(即数千)的线程是一个非常糟糕的想法。即使您可以在不触发OOME的情况下完成它,您也会消耗大量的堆栈和堆内存,并对调度程序和垃圾收集器产生很大的负载...而没有真正的好处。

Stephen,问题是我对poolRunner方法没有控制权。假设这个方法来自某个库,该库内部使用ForkJoinPool,但我需要循环执行此方法。 - Mahadeva
1
抱歉,在那种情况下没有解决方案。如果那是问题的根源,那么如果想要解决方案,您需要改变问题本身。 - Stephen C
1
我不知道。但我认为如果你按照正确的方式做事情,这并不重要。 - Stephen C
4
该代码不仅创建了大量的线程池,而且在每个线程池使用后忘记调用shutdown()方法,因此规范中没有任何保证工作线程会终止。因此,在具有16个核心的机器上,它可能创建32000个永远不会死亡的线程,都符合规范要求。 - Holger
4
我们没有让并行流在指定的线程池中运行,而是始终在公共池中运行,这是有原因的——为了防止像作者所刻意安排的那样自我拍摄。 "poolRunner()" 的作者认为通过“智能地”“超越”运行时,并从 Stack Overflow 复制和粘贴了他们不理解的“解决方法”代码,这很聪明。 但是,这个代码是严重错误的;你的唯一选择就是不要调用它。 - Brian Goetz

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接