创建一个动态(增长/缩小)的线程池

20

我需要在Java中实现一个线程池(java.util.concurrent),它的线程数在空闲时保持在某个最小值,当任务提交速度快于任务执行完成速度时,线程数增加到一个上限(但不会超过),当所有任务都完成且没有更多任务提交时,线程数将缩小至下限。

你会如何实现这样的功能?我想这应该是一个相当常见的用法场景,但明显 java.util.concurrent.Executors 工厂方法只能创建固定大小或无限制增长的线程池。ThreadPoolExecutor 类提供了 corePoolSize 和 maximumPoolSize 参数,但其文档似乎暗示了要想同时拥有超过 corePoolSize 线程数,唯一的方式是使用有界任务队列,在这种情况下,如果已达到 maximumPoolSize 的线程数,您将得到任务拒绝,需要自己处理?我想出了以下解决方案:

//pool creation
ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<Runnable>(minSize));
...

//submitting jobs
for (Runnable job : ...) {
    while (true) {
        try {
            pool.submit(job);
            System.out.println("Job " + job + ": submitted");
            break;
        } catch (RejectedExecutionException e) {
            // maxSize jobs executing concurrently atm.; re-submit new job after short wait
            System.out.println("Job " + job + ": rejected...");
            try {
                Thread.sleep(300);
            } catch (InterruptedException e1) {
            }
        }
    }
}

我有没有忽略什么?有更好的方法吗?另外,根据要求,上述代码可能会存在问题,直到至少完成了(我认为)(总作业数)- maxSize个作业。因此,如果您希望能够将任意数量的作业提交到池中并立即进行而不等待任何作业完成,我不知道您是否可以在没有专用的“作业提交”线程的情况下实现它,该线程管理所需的无界队列以容纳所有已提交的作业。 据我所见,如果您正在为 ThreadPoolExecutor 使用无界队列本身,则其线程计数永远不会超过 corePoolSize。


3
我必须承认,我没能看出动态大小线程池的用处。您的应用程序在运行期间的处理器数量是否会发生变化? - corsiKa
3
为什么newCachedThreadPool不适合你的情况?它会自动关闭不再使用的线程。 - Tudor
如果您的空闲线程没有死亡会发生什么?假设您一直拥有最大尺寸的固定大小池,会发生什么? - Peter Lawrey
1
据我所知,newCachedThreadPool创建了一个线程池,如果您将许多长时间运行的作业提交到其中,则线程数可能会无限增长。 - Olaf Klischat
5
在运行时处理器数量不会改变,但如果任务主要受限于I/O而非CPU,则我看不出这与相关性何在。在这种情况下,即使在单处理器系统上使用多个线程,您也可以实现增加的任务吞吐量。 - Olaf Klischat
3个回答

14

当线程的增长和收缩与程序同时出现时,我想到的只有一个名字:来自java.util.concurrent包的CachedThreadPool

ExecutorService executor = Executors.newCachedThreadPool();

CachedThreadPool(缓存线程池)可以重复使用线程,同时在需要时创建新线程。 而且,如果一个线程闲置了60秒,CachedThreadPool会终止它。因此这非常轻量级——根据您的需求增长和收缩!


11
正确,但它没有界限。 - Gray
3
您可以使用底层的ThreadPoolExecutor,并手动设置最大线程池大小,甚至可以在运行时进行设置。 - Askar Kalykov

8
一个小技巧是指定一个RejectedExecutionHandler,该处理程序使用相同的线程将作业提交到阻塞队列中。这将阻塞当前线程并消除某些循环的需要。
参见我在此处的答案:

如何使ThreadPoolExecutor命令等待,如果它需要处理太多数据?

下面是从那个答案中复制的拒绝处理程序。
final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
       0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will call the rejected
// handler when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
      // this will block if the queue is full
      executor.getQueue().put(r);
   }
});

只要您知道在创建核心线程之前,先使用有界阻塞队列填充,就可以利用核心/最大线程数。因此,如果您有10个核心线程,并且想要第11个作业启动第11个线程,不幸的是您需要一个大小为0的阻塞队列(可能是SynchronousQueue)。我觉得这在否则非常好的ExecutorService类中是一个真正的限制。


糟糕。我正在尝试找到一种方法,使池大小随着队列填充而增长,而不是等待直到满。啊好吧。 - zerpsed
1
你可以这样做@zerpsed。根据队列中的项目数量,您可以调整核心线程数。 - Gray
我正在使用LinkedBlockingQueue,并希望保持空闲线程数量较少。增加核心大小将增加我所持有的空闲线程数,或者我可能误解了信息。在负载下,SynchronousQueue表现不佳,我认为这是由于它似乎强制执行1:1任务到可用线程的关系。看起来我可以监视队列大小并在负载下调整核心数,当队列为空时则减少核心数。我曾希望ThreadPoolExecutor会为我完成这个操作,但除非队列已满,否则不会创建超出核心大小的新线程。 - zerpsed
你应该看一下我在 @zerpsed 这里的答案。 https://dev59.com/xWIk5IYBdhLWcg3wKLSd 该问题(以及我的答案)可以让你在队列满之前扩展线程。那可能是你需要的全部。 - Gray
谢谢分享,非常酷。在回来之前,我构建了一些逻辑来根据队列大小调整线程规模。到目前为止,在负载下表现良好。 - zerpsed

1

maximumPoolSize设置为Integer.MAX_VALUE。如果您有超过20亿个线程...好运吧。

无论如何,ThreadPoolExecutor的Javadoc说明如下:

通过将maximumPoolSize设置为诸如Integer.MAX_VALUE之类的基本无限值,您可以允许池容纳任意数量的并发任务。通常情况下,仅在构建时设置核心和最大池大小,但也可以使用setCorePoolSize(int)和setMaximumPoolSize(int)动态更改它们。

使用类似于LinkedBlockingQueue的同样无限制的任务队列,这应该具有任意大的容量。


1
感谢您还参考了这个链接:https://dev59.com/NYfca4cB1Zd3GeqPp_MK#40384042。缩小线程池大小可能会引发其他问题,这些问题在其他问题中得到了解决。 - V H

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接