有没有可能创建一个具有大小限制的缓存线程池?

144

看起来似乎不可能创建一个带有线程数量限制的缓存线程池。

以下是标准Java库中静态Executors.newCachedThreadPool的实现方式:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

因此,使用该模板来创建固定大小的缓存线程池:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

现在如果你使用这个代码并提交3个任务,一切都会正常。再提交更多的任务将会导致“rejected execution exceptions”异常。

尝试以下代码:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

将导致所有线程顺序执行。也就是说,线程池永远不会创建多个线程来处理您的任务。

这是ThreadPoolExecutorexecute方法中的一个错误吗?还是故意为之?或者有其他方法吗?

编辑:我想要像缓存线程池一样的东西(它按需创建线程,然后在一些超时后终止它们),但限制其可以创建的线程数量,并且在达到线程限制后能够继续排队其他任务。根据sjlee的回答,这是不可能的。查看ThreadPoolExecutorexecute()方法,确实不可能。我需要子类化ThreadPoolExecutor并覆盖execute(),有点像SwingWorker所做的那样,但SwingWorker在其execute()中所做的是完全的hack。


1
你有什么问题吗?你的第二个代码示例不是回答了你的标题吗? - rsp
6
我希望有一个线程池,它会在任务数量增长时按需添加线程,但最多只会添加一些预设的最大数量的线程。CachedThreadPool已经可以实现这个功能,但它会添加无限数量的线程,并且不会停止于某个预定义的大小。在示例中我定义的大小是3。第二个示例添加了1个线程,但当其他任务尚未完成时,新任务到达时不会再添加两个线程。 - Matt Wonlaw
请查看此链接,它可以解决问题:http://debuggingisfun.blogspot.com/2012/05/java-cachedthreadpool-with-min-and-max.html - ethan
与:https://dev59.com/xWIk5IYBdhLWcg3wKLSd#19528305 相关。 - Gray
14个回答

245
ThreadPoolExecutor有以下几个关键的行为,你遇到的问题可以通过这些行为来解释:
1. 当任务被提交时, - 如果线程池还没有达到核心大小,它会创建新的线程。 - 如果已经达到核心大小并且没有空闲线程,则将任务加入队列。 - 如果已经达到核心大小,没有空闲线程,并且队列已满,则创建新的线程(直到达到最大大小)。 - 如果已经达到最大大小,没有空闲线程,并且队列已满,则会触发拒绝策略。
在第一个示例中,请注意SynchronousQueue实际上大小为0。因此,当达到最大大小(3)时,将触发拒绝策略(#4)。
在第二个示例中,所选择的队列是LinkedBlockingQueue,它具有无限大小。因此,你会遇到行为#2。
你不能太多地操作缓存类型或固定类型,因为它们的行为几乎完全确定。
如果你想拥有有界和动态的线程池,你需要使用正数的核心大小和最大大小,再结合一个有限大小的队列。例如,
new ThreadPoolExecutor(10, // core size
    50, // max size
    10*60, // idle timeout
    TimeUnit.SECONDS,
    new ArrayBlockingQueue<Runnable>(20)); // queue with a size

补充: 这是一个相当旧的答案,而且似乎 JDK 在处理核心池大小为 0 的情况时已经改变了行为。自 JDK 1.6 以来,如果核心池大小为 0 而线程池没有任何线程,则 ThreadPoolExecutor 将添加一个线程来执行该任务。因此,核心池大小为 0 是上述规则的例外情况。感谢 Steve 提供的 提醒


6
allowCoreThreadTimeOut 是一个线程池中的方法,允许核心线程在空闲时间超时。这意味着即使没有任务需要执行,核心线程也可以在空闲一段时间后被终止。如果您希望在某些情况下释放资源,则可以使用此方法。请查看@user1046052的答案了解更多详细信息。 - hsestupin
1
好答案!只有一个要点需要补充:其他的拒绝策略也值得一提。请参考 @brianegge 的回答。 - Jeff
1
行为2不应该说“如果已达到maxThread大小且没有空闲线程,则将任务排队”吗? - Zoltán
1
你能详细说明一下队列大小的含义吗?这是否意味着只有20个任务可以排队,排满后就会被拒绝? - Zoltán
1
@Zoltán 我之前写过这个,所以有可能自那时起某些行为已经发生了变化(我没有太密切地关注最新的活动),但是假设这些行为没有改变,那么#2就像陈述的那样是正确的,这也许是最重要的(并且有些令人惊讶)一点。一旦核心大小达到,TPE会优先选择排队而不是创建新线程。队列大小就是传递给TPE的队列大小。如果队列变满了但还没有达到最大大小,它将创建一个新线程(而不是拒绝任务)。请参见#3。希望这可以帮助到你。 - sjlee
显示剩余3条评论

64
除非我漏了什么,否则原问题的解决方案很简单。以下代码实现了原帖中描述的所需行为。它将生成最多5个线程来处理无界队列,空闲线程将在60秒后终止。
tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);

1
你是正确的。该方法是在jdk 1.6中添加的,所以不是很多人知道它。此外,“min”核心池大小不能为零,这很遗憾。 - jtahlborn
4
我对此的唯一担忧是(来自JDK 8文档):“当在方法execute(Runnable)中提交新任务且运行的线程少于corePoolSize时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理请求。” - veegee
很确定这实际上不起作用。 上次我尝试过,即使您生成了5个线程,它实际上只在一个线程中运行您的工作。 再次说明,已经过去几年了,但当我深入研究ThreadPoolExecutor的实现时,它只有在队列满时才会分派到新线程。 使用无界队列会导致此情况永远不会发生。 您可以通过提交工作并记录线程名称然后休眠来进行测试。每个可运行项最终都将打印相同的名称/不在任何其他线程上运行。 - Matt Wonlaw
2
这个是有效的,Matt。你将核心大小设置为0,所以只有1个线程。这里的诀窍是将核心大小设置为最大值。 - T-Gergely
1
@vegee 是正确的 - 这并不是非常有效 - ThreadPoolExecutor 只有在超过 corePoolSize 时才会重用线程。因此,当 corePoolSize 等于 maxPoolSize 时,只有在池满时才能从线程缓存中受益(因此,如果您打算使用此功能但通常保持在最大池大小以下,那么您可以将线程超时时间降低到较低值;并且请注意,没有缓存 - 总是新线程)。 - Chris Riddell
@Riddell 是正确的。它不像具有线程限制和无界队列的 CachedThreadPool 一样运行,因为它不会重用线程。ThreadPoolExecutor 可以提供两个主要优点:1)通过重用线程减少执行任务的开销;2)限制使用的资源数量(例如线程)。但是这种解决方案不提供前者。 - Stefan Feuerhahn

8

我也遇到了同样的问题。由于没有其他答案将所有问题整合在一起,因此我要补充我的答案:

现在在文档中明确写着:如果您使用一个不会阻塞的队列(LinkedBlockingQueue),则最大线程数设置无效,只有核心线程被使用。

因此:

public class MyExecutor extends ThreadPoolExecutor {

    public MyExecutor() {
        super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        allowCoreThreadTimeOut(true);
    }

    public void setThreads(int n){
        setMaximumPoolSize(Math.max(1, n));
        setCorePoolSize(Math.max(1, n));
    }

}

这个执行器具有以下特点:
  1. 没有最大线程的概念,因为我们使用的是无界队列。这是一件好事,因为这样的队列可能会按照惯例导致执行器创建大量非核心、额外的线程。

  2. 队列的最大大小为Integer.MAX_VALUE。如果待处理任务的数量超过了Integer.MAX_VALUE,则submit()将抛出RejectedExecutionException异常。不确定我们首先会耗尽内存还是发生这种情况。

  3. 最多可以具有4个核心线程。空闲的核心线程如果闲置5秒钟,则会自动退出。因此,严格按需线程数,可以使用setThreads()方法来更改数量。

  4. 确保核心线程的最小数量永远不会少于1,否则submit()将拒绝每个任务。由于核心线程需要>=最大线程,所以setThreads()方法设置最大线程,但对于无界队列而言,最大线程设置是无用的。


我认为你还需要将'allowCoreThreadTimeOut'设置为'true',否则,一旦线程被创建,它们将永远存在:https://gist.github.com/ericdcobb/46b817b384f5ca9d5f5d - eric
抱歉,我刚才错过了,你的答案是完美的! - eric

7
在你的第一个示例中,由于 AbortPolicy 是默认的 RejectedExecutionHandler,因此后续任务被拒绝。ThreadPoolExecutor 包含以下策略,您可以通过 setRejectedExecutionHandler 方法进行更改:
CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

听起来你想要一个具有CallerRunsPolicy的缓存线程池。

5

这里的回答都没有解决我的问题,我的问题与使用Apache的HTTP客户端(3.x版本)创建有限数量的HTTP连接有关。由于我花了几个小时才找到一个好的设置方案,所以我想分享一下:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
  TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
  Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

这将创建一个ThreadPoolExecutor,它从五个线程开始,并最多同时运行十个线程,使用CallerRunsPolicy来执行。

这个解决方案的问题在于,如果你增加生产者的数量,你将会增加后台线程的数量。在许多情况下,这并不是你想要的。 - Gray

3
根据ThreadPoolExecutor的Javadoc文档:
如果正在运行的线程数超过corePoolSize但少于maximumPoolSize,则只有在队列已满的情况下才会创建新线程。通过将corePoolSize和maximumPoolSize设置为相同,您可以创建固定大小的线程池。
(强调是我的。)
jitter的答案是你想要的,尽管我的回答了你的另一个问题。 :)

2

看起来没有任何答案实际回答了这个问题 - 实际上我找不到一种方法来做到这一点 - 即使您从PooledExecutorService的子类从属,因为许多方法/属性都是私有的,例如使addIfUnderMaximumPoolSize受保护,您可以执行以下操作:

class MyThreadPoolService extends ThreadPoolService {
    public void execute(Runnable run) {
        if (poolSize() == 0) {
            if (addIfUnderMaximumPoolSize(run) != null)
                return;
        }
        super.execute(run);
    }
}

我最接近的解决方案是这个,但即使如此也不是一个很好的解决方案。
new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
    public void execute(Runnable command) {
        if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {        
            super.setCorePoolSize(super.getCorePoolSize() + 1);
        }
        super.execute(command);
    }

    protected void afterExecute(Runnable r, Throwable t) {
         // nothing in the queue
         if (getQueue().isEmpty() && getPoolSize() > min) {
             setCorePoolSize(getCorePoolSize() - 1);
         }
    };
 };

附注:以上内容未经测试


2

这里有另一种解决方案。我认为这个解决方案的行为符合你想要的要求(尽管我对这个解决方案并不感到自豪):

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    public boolean offer(Runnable o) {
        if (size() > 1)
            return false;
        return super.offer(o);
    };

    public boolean add(Runnable o) {
        if (super.offer(o))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }
};

RejectedExecutionHandler handler = new RejectedExecutionHandler() {         
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        queue.add(r);
    }
};

dbThreadExecutor =
        new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);

2

这是你想要的(至少我猜是这样)。如需解释,请查看Jonathan Feinberg的答案

Executors.newFixedThreadPool(int n)

创建一个线程池,它重用一定数量的线程,并使用共享无界队列。在任何时候,最多只有nThreads个线程会活跃地处理任务。如果所有线程都处于活动状态并提交了额外的任务,则它们将等待在队列中,直到有线程可用。如果任何线程在关闭之前由于执行期间的失败而终止,则如果需要执行后续任务,则会有一个新线程取代它。线程池中的线程将一直存在,直到显式关闭。


7
当然,我可以使用一个固定大小的线程池,但这会使得n个线程一直存在,直到我调用shutdown方法。我想要的是一个与缓存线程池完全相同的东西(按需创建线程,并在一段时间后销毁它们),但是它有一个可以创建的线程数量限制。 - Matt Wonlaw

2

还有一种选择。可以使用任何其他队列,而不是使用新的SynchronousQueue,但必须确保其大小为1,这将强制executorservice创建新的线程。


我认为你的意思是默认大小为0,这样就不会有任务排队,真正强制executorservice每次都创建新线程。 - Leonmax

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接