当ThreadPoolExecutor的corePoolSize为0时,直到任务队列已满之前不应执行任何任务。

11
我正在阅读《Java Concurrency In Practice》,其中的第8.3.1节——线程创建和清理让我卡住了。下面的脚注警告我们要保持corePoolSize为零。

开发人员有时会将核心大小设置为零,以便工作线程最终被销毁,从而不会阻止JVM退出,但这可能会导致一些在其工作队列中不使用SynchronousQueue(如newCachedThreadPool)的线程池产生奇怪的行为。 如果线程池已经达到核心大小,则ThreadPoolExecutor仅在工作队列已满时创建新线程。 因此,对于具有任何容量和核心大小为零的工作队列的线程池提交的任务将不执行,直到队列填满,这通常不是所期望的。

因此,为了验证这一点,我编写了以下程序,它并不按照上述方式工作。

    final int corePoolSize = 0;
    ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>());

    // If the pool is already at the core size
    if (tp.getPoolSize() == corePoolSize) {
        ExecutorService ex = tp;

        // So tasks submitted to a thread pool with a work queue that has any capacity
        // and a core size of zero will not execute until the queue fills up.
        // So, this should not execute until queue fills up.
        ex.execute(() -> System.out.println("Hello"));
    }

输出: Hello

因此,该程序的行为是否表明ThreadPoolExecutor在提交任务时创建至少一个线程,而不考虑corePoolSize=0。 如果是,则文本书中的警告是什么。

编辑:根据@S.K.的建议,在jdk1.5.0_22中测试了以下更改的代码:

ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1));//Queue size is set to 1.

但是随着这个更改,程序无法打印任何输出而终止。
所以我对书中的这些陈述有误解吗?
编辑(@sjlee):在评论中添加代码很困难,因此我将其作为编辑添加在此处...您可以尝试进行此修改,并针对最新的JDK和JDK 1.5运行它。
final int corePoolSize = 0;
ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

// If the pool is already at the core size
if (tp.getPoolSize() == corePoolSize) {
    ExecutorService ex = tp;

    // So tasks submitted to a thread pool with a work queue that has any capacity
    // and a core size of zero will not execute until the queue fills up.
    // So, this should not execute until queue fills up.
    ex.execute(() -> System.out.println("Hello"));
}
tp.shutdown();
if (tp.awaitTermination(1, TimeUnit.SECONDS)) {
    System.out.println("thread pool shut down. exiting.");
} else {
    System.out.println("shutdown timed out. exiting.");
}

@sjlee已在评论中发布了结果。


@sjlee JDK 1.5 的输出是 thread pool shut down. exiting.,JDK 1.8 的输出是 Hello thread pool shut down. exiting. - Steve
好的,谢谢。正如您在下面发现的那样,JDK 在这方面的行为似乎从 1.5 版本到 1.6 版本稍有变化。 - sjlee
@sjlee 但是在任何Java版本中的代码都没有推断出教科书中提到的文本。我是否错误地解释了这段文字?根据我在问题中的示例,即使任务队列已满且corePoolSize=0,任务也不会执行。我无法理解这一点。 - Steve
@sjlee 我已经在我的问题中提到了。看看jdk1.5的测试结果。这里,核心池大小=0,任务队列大小=1,提交的任务数量=1。 - Steve
我认为JCP的另一个脚注也是错误的:allowCoreThreadTimeOut允许您请求所有池线程都能够超时;如果您想要一个有界线程池和有界工作队列,但仍然希望在没有工作可做时关闭所有线程,请使用零个核心大小启用此功能。 "使用零个核心大小启用此功能"没有意义。 - Jason
显示剩余3条评论
4个回答

6

当Java 5的核心池大小为零时,ThreadPoolExecutor的这种奇怪行为显然被认为是一个错误,并在Java 6中悄悄地修复了。

事实上,在Java 7中,由于在6和7之间进行了一些代码重构,该问题重新出现。随后报告了这个问题,并将其确认为bug并进行了修复。

无论如何,您都不应该使用受此bug影响的Java版本。 Java 5已经于2015年到期,Java 6及以上的最新版本也没有受到影响。《Java并发编程实战》的该部分内容已不再适用。

参考资料:


4
在运行这个程序时,我发现在jdk 1.5、1.6、1.7和1.8中,ThreadPoolExecutor#execute(Runnable)的不同实现。具体来说,如下所示:

JDK 1.5 的实现

 //Here poolSize is the number of core threads running.

 public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    for (;;) {
        if (runState != RUNNING) {
            reject(command);
            return;
        }
        if (poolSize < corePoolSize && addIfUnderCorePoolSize(command))
            return;
        if (workQueue.offer(command))
            return;
        Runnable r = addIfUnderMaximumPoolSize(command);
        if (r == command)
            return;
        if (r == null) {
            reject(command);
            return;
        }
        // else retry
    }
}

corePoolSize 为0时,该实现不会创建线程,因此所提供的任务也不会被执行。

JDK 1.6 实现

//Here poolSize is the number of core threads running.

  public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

JDK 1.6即使corePoolSize为0,也会创建一个新的线程。

JDK 1.7+实现(与JDK 1.6类似,但锁定和状态检查更好)

    public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

JDK 1.7即使corePoolSize为0,也会创建新线程。

因此,在JDK 1.5和JDK 1.6+的每个版本中,corePoolSize=0都是一个特殊情况。

但令人奇怪的是,该书的解释与任何程序结果都不匹配。


3
我不明白你是如何得出“书中的解释与任何程序结果都不一致”的结论的,因为你已经承认Java 5的结果与描述相符。 - Holger
@Holger 我在Java 5中运行了以下代码,预期会打印“Hello”,但实际上没有打印出来。final int corePoolSize = 0; ThreadPoolExecutor tp = new ThreadPoolExecutor(corePoolSize, 1, 5, TimeUnit.SECONDS,new LinkedBlockingQueue<>(1));//队列大小设置为1。 if (tp.getPoolSize() == corePoolSize) { ExecutorService ex = tp; ex.execute(() -> System.out.println("Hello")); } - Steve
3
按照书上所说的做,那么你为什么声称“书中的解释与任何一个程序结果都不相符呢?”。 - Holger

0

你可以修改一个BlockingQueue,使其不通过offer(由Executor使用)接受任何Runnable,并在拒绝情况下通过“add”添加。 该设置具有0个核心线程,并在作业排队之前填充32个运行线程。我认为这是许多人期望的,首先填充最多运行线程,然后再排队。

    private static BlockingQueue<Runnable> workQueue = new LinkedBlockingDeque<>()  {
        private static final long serialVersionUID = 1L;
        @Override public boolean offer(Runnable e) { return false; }
    };

    private static ThreadPoolExecutor executor  = new ThreadPoolExecutor(0, 32, 10, TimeUnit.SECONDS, workQueue, (r,e)->workQueue.add(r));

0

看起来这是由于旧版本Java中的一个bug,但在Java 1.8中已不存在了。

根据Java 1.8文档中ThreadPoolExecutor.execute()所述:

     /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     * ....
     */

在第二点中,在将工作人员添加到队列后,会进行重新检查,如果可以启动新线程而不是排队任务,则回滚排队并启动新线程。
这就是正在发生的事情。在第一次检查期间,任务被排队,但在重新检查期间,启动了一个新线程来执行您的任务。

1
我在 jdk1.6.0_45 中运行了这个程序,它输出的结果是相同的。 - Steve
问题可能出在JDK 1.5上。我只能在谷歌上找到这个链接,暗示这个问题已经在1.6中得到了解决:http://cs.oswego.edu/pipermail/concurrency-interest/2006-December/003453.html - S.K.
1
我已经添加了 jdk1.5.0_22 的测试结果。请查看问题中的更新。在我看来,如果这是一个错误,它本应该在教科书中提到。 - Steve
我已经添加了一个合理的解释。请看一下。 - Steve

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接