Java CompletableFuture如何使用线程池

6
在下面的代码中,无论我将i的最大值设置为多少,线程总数从未超过13。它使用的是什么线程池?在哪里可以找到它的默认设置?
public static void main(String[] args) {
    // write your code here
    for (int i = 0; i <= 5; i++) {

        System.out.println("kick off" + i);
        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);

                System.out.println(java.lang.Thread.activeCount());
            }
            catch (Exception e) {
                System.out.println("error");
            }
        });

    }
    System.out.println(java.lang.Thread.activeCount());
    try {
        Thread.sleep(10000);
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
}

1
你的电脑有几个处理器?(Runtime.getRuntime().availableProcessors()) - akuzminykh
1个回答

12

答案

线程池的选择由系统设置或当前可用的处理器数量决定。


文档与代码

根据 CompletableFuture 的文档:

所有没有显式指定执行器参数的 async 方法都使用 ForkJoinPool.commonPool()(除非它不支持至少为2的并行级别,在这种情况下,会创建一个新的线程来运行每个任务)。可以通过在子类中定义 defaultExecutor() 方法来覆盖此行为。[...]

根据 ForkJoinPool#commonPool() 的文档:

返回 common pool 实例。该池是静态构造的;[...]

根据 ForkJoinPool 类本身的文档:

构建 common pool 使用的参数可以通过设置以下系统属性来控制:

  • java.util.concurrent.ForkJoinPool.common.parallelism - 并行级别,非负整数
  • java.util.concurrent.ForkJoinPool.common.threadFactory - ForkJoinPool.ForkJoinWorkerThreadFactory 的类名。系统类加载器用于加载此类。
  • java.util.concurrent.ForkJoinPool.common.exceptionHandler - Thread.UncaughtExceptionHandler 的类名。系统类加载器用于加载此类。
  • java.util.concurrent.ForkJoinPool.common.maximumSpares - 维护目标并行性的允许的额外线程的最大数量(默认为256)。

如果没有通过系统属性提供线程工厂,则 common pool 使用一个使用系统类加载器作为线程上下文类加载器的工厂。

从那里开始,我们需要检查实际的源代码。根据 ForkJoinPool.java#L3208

common = AccessController.doPrivileged(new PrivilegedAction<>() {
    public ForkJoinPool run() {
        return new ForkJoinPool((byte)0); }});

从位于ForkJoinPool.java#L2345的构造函数中:

// [...]
String pp = System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism");
// [...]
if (pp != null)
    parallelism = Integer.parseInt(pp);
// [...]
if (parallelism < 0 && // default 1 less than #cores
    (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
        parallelism = 1;
// [...]
int n = (parallelism > 1) ? parallelism - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
n = (n + 1) << 1;
// [...]
this.workQueues = new WorkQueue[n];

就是这样。它的确定要么基于您的系统设置,要么基于您当前的处理器数量。


数学示例

假设您没有为java.util.concurrent.ForkJoinPool.common.parallelism设置任何内容,那么让我们快速计算一下代码:

在这种情况下,parallelism-1开始,因此我们有:

if (parallelism < 0 && // default 1 less than #cores
    (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
        parallelism = 1;

假设您的机器上有8个内核。因此,您执行 parallelism = Runtime.getRuntime().availableProcessors() - 1,它将把 7 赋值给 parallelism。您没有进入 if,所以我们继续执行。
接下来是:
int n = (parallelism > 1) ? parallelism - 1 : 1;

这会从其中减去一部分,所以n = 6

接着

n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
n = (n + 1) << 1;

结果为16

其实这个操作是先减去2,然后找到下一个2的幂次方,再将结果翻倍。

所以如果你有6个核心,首先变成4,接着变成8,最后再乘以2,就得到了16


那么为什么是13?

为什么会出现13呢?我想你可能只有45个处理器,因此池会使用8个线程。但是,你测量的是Java总共使用的线程数。

System.out.println(java.lang.Thread.activeCount());

Java 目前可能使用大约 13 - 8 = 5 个线程来处理其他 "待机" 事项。


2
真快啊,我本来想等OP告诉我们他的处理器数量,这样我才能在我的回答中提到它。 - akuzminykh

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接