如何让ThreadPoolExecutor在排队之前增加线程数达到最大值?

120

对于支持许多人使用的ExecutorService线程池的默认行为ThreadPoolExecutor 我已经感到沮丧了一段时间。引用Javadocs的话:

如果正在运行的线程数超过corePoolSize但小于maximumPoolSize,则仅在队列已满的情况下创建新线程。

这意味着,如果您使用以下代码定义线程池,它将永远不会启动第二个线程,因为LinkedBlockingQueue是无界的。

ExecutorService threadPool =
   new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
      TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue*/));
只有当您拥有一个有界队列并且该队列已满时,才会启动超出核心数的线程。我怀疑许多初级Java多线程程序员不知道ThreadPoolExecutor的这种行为。 现在我有一个特定的用例,其中这个方案不是最优的。我正在寻找方法,在不编写自己的TPE类的情况下解决它。 我的要求是针对可能不可靠的第三方进行回调的Web服务。
  • 我不想与Web请求同步地进行回调,因此我想使用线程池。
  • 我通常每分钟收到几个请求,因此我不想使用具有大量处于休眠状态的线程的newFixedThreadPool(...)
  • 每隔一段时间,我会出现这种流量的爆发,而我希望将线程数增加到某个最大值(假设为50)。
  • 我需要尽可能地执行所有回调,因此我希望将任何超过50个以上的回调排队。我不想使用newCachedThreadPool()来超负荷其余的Web服务器。
如何解决ThreadPoolExecutor中的这种限制,即需要在队列被限制并且已满之前才会启动更多的线程?怎样才能在排队任务之前启动更多的线程? 编辑: @Flavio提出了一个很好的建议,即使用ThreadPoolExecutor.allowCoreThreadTimeOut(true)使核心线程超时并退出。我考虑过这一点,但我仍然想要核心线程功能。如果可能的话,我不希望池中的线程数低于核心大小。

1
考虑到您的示例最多创建10个线程,使用可增长/缩小的固定大小线程池是否真的能够节省资源? - bstempi
@bstempi说得好。这个数字有点随意。我已经在问题中将其增加到50了。现在我有了这个解决方案,不太确定我实际上想要多少并发线程。 - Gray
1
哎呀!如果我能得到10个赞,那就好了,和我现在的处境完全一样。 - Eugene
10个回答

61

如何解决 ThreadPoolExecutor 的限制,即队列需要被限定并填满后才能启动更多线程。

我相信我已经找到了一个比较优雅(也许有点取巧)的解决方案来解决 ThreadPoolExecutor 的限制。它涉及到扩展 LinkedBlockingQueue 使其在已经排队一些任务时返回 falsequeue.offer(...)。如果当前线程无法跟上排队的任务,则 TPE 将添加额外的线程。如果池中的线程已达到最大值,则将调用 RejectedExecutionHandler 来执行 put(...) 操作将任务放入队列中。

编写一个队列,其中offer(...)可以返回false而且put()从不阻塞,这确实很奇怪,所以这是个取巧的地方。但是这与 TPE 对队列的使用非常契合,所以我认为没有什么问题。

以下是代码:

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        // Offer it to the queue if there is 0 items already queued, else
        // return false so the TPE will add another thread. If we return false
        // and max threads have been reached then the RejectedExecutionHandler
        // will be called which will do the put into the queue.
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // This does the actual put into the queue. Once the max threads
            //  have been reached, the tasks will then queue up.
            executor.getQueue().put(r);
            // we do this after the put() to stop race conditions
            if (executor.isShutdown()) {
                throw new RejectedExecutionException(
                    "Task " + r + " rejected from " + e);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});
使用这个机制时,当我将任务提交到队列时,ThreadPoolExecutor将会:
  1. 将线程数扩展到核心大小(这里是1)。
  2. 将其提供给队列。如果队列为空,它将排队等待现有线程处理。
  3. 如果队列已经有一个或多个元素,则offer(...)将返回false。
  4. 如果返回false,则将线程池中的线程数量扩展到最大值(这里是50)。
  5. 如果已经达到最大值,则调用RejectedExecutionHandler
  6. RejectedExecutionHandler然后将任务放入队列,以便由FIFO顺序中的第一个可用线程处理。

尽管在上面的示例代码中,队列是无界的,但您也可以将其定义为有界队列。例如,如果将容量设置为1000的LinkedBlockingQueue,则它将:

  1. 将线程数扩展到最大值
  2. 然后排队,直到具有1000个任务的队列已满
  3. 然后阻止调用者,直到队列中有空间可用为止。

此外,如果您需要在RejectedExecutionHandler中使用offer(...),则可以改用offer(E, long, TimeUnit)方法,其中超时为Long.MAX_VALUE

警告:

如果您希望在执行程序已关闭后添加任务,则可能希望在自定义的RejectedExecutionHandler中更加智能地抛出RejectedExecutionException。感谢@RaduToader指出这一点。

编辑:

对此答案的另一个微调可能是询问TPE是否有空闲线程,并仅在有空闲线程时将项目排队。您需要为此创建一个真正的类,并在其上添加ourQueue.setThreadPoolExecutor(tpe);方法。

然后,您的offer(...)方法可能如下所示:

  1. 检查tpe.getPoolSize() == tpe.getMaximumPoolSize(),如果是,则只需调用super.offer(...)
  2. 否则,如果tpe.getPoolSize() > tpe.getActiveCount(),则调用super.offer(...),因为似乎有空闲线程。
  3. 否则返回false以分叉另一个线程。

也许是这样的:

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}

请注意,TPE上的get方法很昂贵,因为它们访问volatile字段或(在 getActiveCount()的情况下)锁定TPE并遍历线程列表。此外,这里存在竞争条件,可能会导致任务被不正确地排队或在有空闲线程时另一个线程被分叉。


1
我也曾经遇到过同样的问题,最终用覆盖 execute 方法解决了。但这真的是一个很好的解决方案。 :) - codingenious
尽管我不喜欢打破 Queue 合约来完成这个任务的想法,但你绝对不是唯一一个持有这种想法的人:http://groovy-programming.com/post/26923146865 - bstempi
4
你是否感到奇怪,前几个任务会被排队,只有在这之后才会产生新的线程?比如,如果你的单核线程正在忙于一个长时间运行的任务,并且你调用 "execute(runnable)",那么 "runnable" 就只是被添加到队列中。如果你调用 "execute(secondRunnable)",那么 "secondRunnable" 就会被添加到队列中。但是现在,如果你调用 "execute(thirdRunnable)",那么 "thirdRunnable" 会在新线程中运行。只有在 "thirdRunnable" (或原始的长时间运行任务) 完成后,"runnable" 和 "secondRunnable" 才会运行。 - Robert Tupelo-Schneck
1
没错,Robert是对的,在高度多线程的环境中,队列有时会在有空闲线程可用的情况下继续增长。下面这个扩展TPE的解决方案效果要好得多。我认为应该将Robert的建议标记为答案,即使上述hack也很有趣。 - Wanna Know All
1
“RejectedExecutionHandler” 在关闭执行器时提供了帮助。现在你被强制使用 shutdownNow(),因为 shutdown() 无法阻止新任务的添加(由于请求)。 - Radu Toader
显示剩余9条评论

40

将核心大小和最大大小设置为相同的值,并使用 allowCoreThreadTimeOut(true) 允许从池中删除核心线程。


2
+1 是的,我也考虑过这个问题,但我仍然想要核心线程功能。我不希望在线程处于休眠期间时线程池变为0线程。我会编辑我的问题来指出这一点。但你提出了一个很好的观点。 - Gray
1
谢谢!那只是最简单的方法。 - Dmitry Ovchinnikov

37

我已经在这个问题上得到了另外两个答案,但我怀疑这一个是最好的。

它基于目前被接受的答案的技术,即:

  1. 重写队列的offer()方法来(有时)返回false,
  2. 这将导致ThreadPoolExecutor要么生成新线程,要么拒绝该任务,并且
  3. 设置RejectedExecutionHandler来在拒绝时实际地将任务排入队列。

问题在于offer()应该何时返回false。目前被接受的答案在队列有几个任务时返回false,但正如我在那里的评论中指出的那样,这会导致不良影响。或者,如果你总是返回false,你将不断生成新线程,即使你有线程在等待队列。

解决方案是使用Java 7中的LinkedTransferQueue,并让offer()调用tryTransfer()。当有等待的消费者线程时,任务将直接传递给该线程。否则,offer()将返回false,ThreadPoolExecutor将生成一个新线程。

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });

我必须承认,这对我来说看起来最干净。唯一的缺点是,LinkedTransferQueue是无界的,因此如果不进行额外的工作,您将无法获得容量有限的任务队列。 - Yeroc
当线程池达到最大容量时会出现问题。例如,当线程池扩展到最大容量并且每个线程都正在执行任务时,如果提交的可运行对象被拒绝,那么ThreadPoolExecutor将尝试添加工作线程,但是线程池已经达到了最大容量,因此可运行对象将被简单地拒绝。根据您编写的rejectedExceHandler,它将再次被提供到队列中,导致从头开始发生这种猴子舞。 - Sudheera
1
@Sudheera 我认为你错了。queue.offer()实际上是调用LinkedTransferQueue.tryTransfer(),会返回false而不是将任务入队。然而,RejectedExecutionHandler调用的是queue.put(),它不会失败并且会将任务入队。 - Robert Tupelo-Schneck
1
@RobertTupelo-Schneck非常有用且友好! - Eugene
2
@RobertTupelo-Schneck 运行得非常好!我不知道为什么 Java 没有类似的开箱即用功能。 - Georgi Peev

7
注意:我现在更喜欢并推荐我的另一个答案
以下是一种对我来说更为直接的版本:每当执行一个新任务时,增加corePoolSize(最多增加到maximumPoolSize的限制),然后每当任务完成时,减少corePoolSize(最少减少到用户指定的“core pool size”的限制)。
换句话说,跟踪正在运行或排队的任务数,并确保corePoolSize等于任务数,只要它在用户指定的“core pool size”和maximumPoolSize之间。
public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
    private int userSpecifiedCorePoolSize;
    private int taskCount;

    public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        userSpecifiedCorePoolSize = corePoolSize;
    }

    @Override
    public void execute(Runnable runnable) {
        synchronized (this) {
            taskCount++;
            setCorePoolSizeToTaskCountWithinBounds();
        }
        super.execute(runnable);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        super.afterExecute(runnable, throwable);
        synchronized (this) {
            taskCount--;
            setCorePoolSizeToTaskCountWithinBounds();
        }
    }

    private void setCorePoolSizeToTaskCountWithinBounds() {
        int threads = taskCount;
        if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
        if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
        setCorePoolSize(threads);
    }
}

当前类不支持在构造后更改用户指定的corePoolSize或maximumPoolSize,也不支持直接或通过remove()purge()操作工作队列。


我喜欢它,除了synchronized块。你能通过队列调用以获取任务数量吗?或者也许使用AtomicInteger - Gray
我本来想避免这些问题,但问题在于:如果有多个单独的线程中存在许多 execute() 调用,每个线程都会 (1) 确定需要多少个线程,(2) 将 setCorePoolSize 设置为该数字,然后 (3) 调用 super.execute()。如果步骤 (1) 和 (2) 没有同步,我不确定如何防止不幸的顺序,即在设置更高的数字后将核心池大小设置为较低的数字。如果可以直接访问超类字段,则可以使用比较和设置来完成此操作,但是我没有看到在子类中进行同步的简洁方法。 - Robert Tupelo-Schneck
我认为该竞争条件的惩罚相对较低,只要 taskCount 字段有效(即 AtomicInteger)。如果两个线程立即在彼此之后重新计算池大小,则应获得适当的值。如果第二个收缩核心线程,则必须已看到队列中的减少或其他情况。 - Gray
1
很遗憾,我认为情况比那更糟。 假设任务10和11调用execute()。 每个都将调用atomicTaskCount.incrementAndGet(),并分别得到10和11。 但是,如果没有同步(在获取任务计数和设置核心池大小时),则可能会出现以下情况:(1)任务11将核心池大小设置为11,(2)任务10将核心池大小设置为10,(3)任务10调用super.execute(),(4)任务11调用super.execute()并排队。 - Robert Tupelo-Schneck
2
我对这个解决方案进行了严格的测试,它显然是最好的。在高度多线程环境下,由于自由线程TPE.execute的性质,它仍然会偶尔排队等待空闲线程,但这种情况很少发生,相比被标记为答案的解决方案,竞争条件更容易发生,因此几乎每次多线程运行都会出现这种情况。 - Wanna Know All

6
我们有一个 ThreadPoolExecutor 的子类,它额外接收一个参数 creationThreshold 并重写了 execute 方法。
public void execute(Runnable command) {
    super.execute(command);
    final int poolSize = getPoolSize();
    if (poolSize < getMaximumPoolSize()) {
        if (getQueue().size() > creationThreshold) {
            synchronized (this) {
                setCorePoolSize(poolSize + 1);
                setCorePoolSize(poolSize);
            }
        }
    }
}

也许这也有帮助,但你的看起来当然更加艺术化...

有趣。谢谢你提供这个信息。我实际上不知道核心大小是可变的。 - Gray
现在我再想一想,就检查队列大小而言,这个解决方案比我的更好。我已经调整了我的答案,使offer(...)方法只有在特定条件下才返回false。谢谢! - Gray

4
推荐的解决方案只解决了JDK线程池中一个问题:
1. JDK线程池倾向于排队而不是生成新的线程,只有在队列达到极限时,线程池才会生成新线程。
2. 当负载减轻时,线程退役并没有发生。例如,如果我们有一堆任务击中池,导致池达到最大值,然后是最多每次2个任务的轻负载,池将使用所有线程来服务轻负载,从而防止线程退役。(只需要2个线程...)
对于上述行为不满意的情况下,我前去实现了一个池来克服上述的缺陷。
要解决第二种情况,使用Lifo调度可以解决这个问题,这个想法是由Ben Maurer在2015年ACM applicative大会上提出的: Systems @ Facebook scale 因此,便产生了一个新的实现: LifoThreadPoolExecutorSQP 到目前为止,这个实现提高了异步执行性能,适用于ZEL
这个实现具有旋转能力,以减少上下文切换的开销,从而在某些用例中产生更好的性能。
希望这可以帮助一些人...
PS: JDK Fork Join Pool 实现 ExecutorService,并作为“正常”线程池使用,实现具有良好的性能,它使用LIFO线程调度,但是对于内部队列大小、退役超时等没有控制,最重要的是取消任务时无法中断它们。

1
太遗憾了,这个实现有太多的外部依赖关系。对我来说毫无用处 :-/ - Martin L.
1
这是一个非常好的观点(第二点)。不幸的是,它的实现并不清晰,因为存在外部依赖,但如果您愿意,仍然可以采纳。 - Alexey Vlasov

1

注意:我现在更喜欢并推荐我的其他答案

我有另一个提议,跟随将队列更改为返回false的原始想法。在这个提议中,所有任务都可以进入队列,但是每当一个任务在execute()之后进入队列时,我们会用一个哨兵无操作任务跟随它,队列会拒绝它,导致新线程产生,该线程将立即执行无操作,然后执行队列中的任务。

因为工作线程可能正在轮询LinkedBlockingQueue以获取新任务,所以即使有可用线程,任务也可能被加入队列。为了避免即使有可用线程也要产生新线程,我们需要跟踪等待队列上新任务的线程数量,并且仅在队列中的任务多于等待线程时才产生新线程。

final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };

final AtomicInteger waitingThreads = new AtomicInteger(0);

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    @Override
    public boolean offer(Runnable e) {
        // offer returning false will cause the executor to spawn a new thread
        if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
        else return super.offer(e);
    }

    @Override
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.poll(timeout, unit);
        } finally {
            waitingThreads.decrementAndGet();
        }
    }

    @Override
    public Runnable take() throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.take();
        } finally {
            waitingThreads.decrementAndGet();
        }
    }
};

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
    @Override
    public void execute(Runnable command) {
        super.execute(command);
        if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
    }
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r == SENTINEL_NO_OP) return;
        else throw new RejectedExecutionException();            
    }
});

0
以下是一个使用两个线程池的解决方案,它们的核心和最大池大小都相同。当第一个池忙碌时,将使用第二个池。
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MyExecutor {
    ThreadPoolExecutor tex1, tex2;
    public MyExecutor() {
        tex1 = new ThreadPoolExecutor(15, 15, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        tex1.allowCoreThreadTimeOut(true);
        tex2 = new ThreadPoolExecutor(45, 45, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
        tex2.allowCoreThreadTimeOut(true);
    }

    public Future<?> submit(Runnable task) {
        ThreadPoolExecutor ex = tex1;
        int excessTasks1 = tex1.getQueue().size() + tex1.getActiveCount() - tex1.getCorePoolSize();
        if (excessTasks1 >= 0) {
            int excessTasks2 = tex2.getQueue().size() + tex2.getActiveCount() - tex2.getCorePoolSize();;
            if (excessTasks2 <= 0 || excessTasks2 / (double) tex2.getCorePoolSize() < excessTasks1 / (double) tex1.getCorePoolSize()) {
                ex = tex2;
            }
        }
        return ex.submit(task);
    }
}

0

我能想到的最好的解决方案是扩展。

ThreadPoolExecutor提供了一些钩子方法:beforeExecuteafterExecute 。 在您的扩展中,您可以使用有界队列来提供任务,并使用第二个无界队列来处理溢出。 当有人调用submit时,您可以尝试将请求放入有界队列中。 如果遇到异常,只需将任务放入溢出队列中。 然后,您可以利用afterExecute挂钩,在完成任务后查看溢出队列中是否有任何内容。 这样,执行器首先会处理其有限队列中的内容,并在时间允许的情况下自动从该无限队列中提取。

这似乎比您的解决方案更费力,但至少不涉及给队列带来意外行为。 我还想象着有一种更好的方法来检查队列和线程的状态,而不是依赖于异常,因为异常抛出相当缓慢。


我不喜欢这个解决方案。我非常确定ThreadPoolExecutor不是为继承而设计的。 - scottb
1
JavaDoc中实际上有一个扩展的示例。他们指出,大多数人可能只会实现钩子方法,但他们告诉你在扩展时还需要注意什么。 - bstempi

0

注意:对于JDK ThreadPoolExecutor,当您有一个有界队列时,只有在offer返回false时才会创建新线程。您可以使用CallerRunsPolicy获得一些有用的东西,它会创建一些反压并直接在调用者线程中调用运行。

我需要从池创建的线程执行任务,并具有无限队列以进行调度,同时池中的线程数可能会增长缩小corePoolSizemaximumPoolSize之间,所以...

最终我从ThreadPoolExecutor进行了完全复制粘贴,并更改了execute方法,因为不幸的是这不能通过扩展来完成(它调用私有方法)。

我不想在新请求到达且所有线程都忙碌时立即生成新线程(因为我通常有短暂的任务)。我添加了一个阈值,但随时可以根据您的需求进行更改(也许对于大多数IO最好删除此阈值)。

private final AtomicInteger activeWorkers = new AtomicInteger(0);
private volatile double threshold = 0.7d;

protected void beforeExecute(Thread t, Runnable r) {
    activeWorkers.incrementAndGet();
}
protected void afterExecute(Runnable r, Throwable t) {
    activeWorkers.decrementAndGet();
}
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        if (isRunning(c) && this.workQueue.offer(command)) {
            int recheck = this.ctl.get();
            if (!isRunning(recheck) && this.remove(command)) {
                this.reject(command);
            } else if (workerCountOf(recheck) == 0) {
                this.addWorker((Runnable) null, false);
            }
            //>>change start
            else if (workerCountOf(recheck) < maximumPoolSize //
                && (activeWorkers.get() > workerCountOf(recheck) * threshold
                    || workQueue.size() > workerCountOf(recheck) * threshold)) {
                this.addWorker((Runnable) null, false);
            }
            //<<change end
        } else if (!this.addWorker(command, false)) {
            this.reject(command);
        }
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接