有没有可能创建一个具有大小限制的缓存线程池?

144

看起来似乎不可能创建一个带有线程数量限制的缓存线程池。

以下是标准Java库中静态Executors.newCachedThreadPool的实现方式:

 public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

因此,使用该模板来创建固定大小的缓存线程池:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

现在如果你使用这个代码并提交3个任务,一切都会正常。再提交更多的任务将会导致“rejected execution exceptions”异常。

尝试以下代码:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

将导致所有线程顺序执行。也就是说,线程池永远不会创建多个线程来处理您的任务。

这是ThreadPoolExecutorexecute方法中的一个错误吗?还是故意为之?或者有其他方法吗?

编辑:我想要像缓存线程池一样的东西(它按需创建线程,然后在一些超时后终止它们),但限制其可以创建的线程数量,并且在达到线程限制后能够继续排队其他任务。根据sjlee的回答,这是不可能的。查看ThreadPoolExecutorexecute()方法,确实不可能。我需要子类化ThreadPoolExecutor并覆盖execute(),有点像SwingWorker所做的那样,但SwingWorker在其execute()中所做的是完全的hack。


1
你有什么问题吗?你的第二个代码示例不是回答了你的标题吗? - rsp
6
我希望有一个线程池,它会在任务数量增长时按需添加线程,但最多只会添加一些预设的最大数量的线程。CachedThreadPool已经可以实现这个功能,但它会添加无限数量的线程,并且不会停止于某个预定义的大小。在示例中我定义的大小是3。第二个示例添加了1个线程,但当其他任务尚未完成时,新任务到达时不会再添加两个线程。 - Matt Wonlaw
请查看此链接,它可以解决问题:http://debuggingisfun.blogspot.com/2012/05/java-cachedthreadpool-with-min-and-max.html - ethan
与:https://dev59.com/xWIk5IYBdhLWcg3wKLSd#19528305 相关。 - Gray
14个回答

2
问题概述如下:

我需要一个与缓存线程池完全相同的东西(它根据需要创建线程,然后在一定时间后将其终止),但它限制了它可以创建的线程数量,并且在达到线程限制后仍能继续排队附加任务。

在指出解决方案之前,我将解释以下解决方案为何不起作用:
new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());

当达到3个限制时,这将不会排队任何任务,因为根据定义,同步队列无法容纳任何元素。
new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

这不会创建超过一个线程,因为ThreadPoolExecutor仅在队列已满时才会创建超过corePoolSize的线程。但是LinkedBlockingQueue永远不会满。

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS,
    new LinkedBlockingQueue<Runnable>());
executor.allowCoreThreadTimeOut(true);

这将不会重复利用线程,直到核心线程数已达到,因为ThreadPoolExecutor会增加线程数量,直到达到核心线程数,即使现有线程处于空闲状态。如果您可以接受这个缺点,那么这是问题的最简单解决方案。这也是《Java并发编程实践》中描述的解决方案(第172页的脚注)。
唯一完整的解决方案似乎是涉及覆盖队列的offer方法,并编写一个RejectedExecutionHandler,如本问题的答案所述:如何让ThreadPoolExecutor在排队之前增加线程到最大值?

1

这适用于Java8+(以及其他版本,目前为止..)

     Executor executor = new ThreadPoolExecutor(3, 3, 5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>()){{allowCoreThreadTimeOut(true);}};

这里的“3”是线程计数的限制,“5”是空闲线程的超时时间。

如果您想要自行检查是否有效,这里是完成此任务的代码:

public static void main(String[] args) throws InterruptedException {
    final int DESIRED_NUMBER_OF_THREADS=3; // limit of number of Threads for the task at a time
    final int DESIRED_THREAD_IDLE_DEATH_TIMEOUT=5; //any idle Thread ends if it remains idle for X seconds

    System.out.println( java.lang.Thread.activeCount() + " threads");
    Executor executor = new ThreadPoolExecutor(DESIRED_NUMBER_OF_THREADS, DESIRED_NUMBER_OF_THREADS, DESIRED_THREAD_IDLE_DEATH_TIMEOUT, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>()) {{allowCoreThreadTimeOut(true);}};

    System.out.println(java.lang.Thread.activeCount() + " threads");

    for (int i = 0; i < 5; i++) {
        final int fi = i;
        executor.execute(() -> waitsout("starting hard thread computation " + fi, "hard thread computation done " + fi,2000));
    }
    System.out.println("If this is UP, it works");

    while (true) {
        System.out.println(
                java.lang.Thread.activeCount() + " threads");
        Thread.sleep(700);
    }

}

static void waitsout(String pre, String post, int timeout) {
    try {
        System.out.println(pre);
        Thread.sleep(timeout);
        System.out.println(post);
    } catch (Exception e) {
    }
}

上面代码的输出结果是:
1 threads
1 threads
If this is UP, it works
starting hard thread computation 0
4 threads
starting hard thread computation 2
starting hard thread computation 1
4 threads
4 threads
hard thread computation done 2
hard thread computation done 0
hard thread computation done 1
starting hard thread computation 3
starting hard thread computation 4
4 threads
4 threads
4 threads
hard thread computation done 3
hard thread computation done 4
4 threads
4 threads
4 threads
4 threads
3 threads
3 threads
3 threads
1 threads
1 threads

0
  1. 你可以像@sjlee建议的那样使用ThreadPoolExecutor

    你可以动态控制线程池的大小。查看这个问题以获取更多细节:

    Dynamic Thread Pool

    或者

  2. 你可以使用newWorkStealingPool API,它是在Java 8中引入的。

    public static ExecutorService newWorkStealingPool()
    

    使用所有可用处理器作为目标并行级别创建一个工作窃取线程池。

默认情况下,并行级别设置为服务器中的 CPU 核心数。如果您有 4 核 CPU 服务器,则线程池大小将为 4。此 API 返回 ForkJoinPool 类型的 ExecutorService,并允许空闲线程窃取繁忙线程在 ForkJoinPool 中的任务。


0

我建议使用Signal方法

SignalExecutors类中:

ThreadPoolExecutor只有在提供的队列从offer()返回false时才会创建新线程。这意味着,如果您给它一个无界队列,它将只创建1个线程,无论队列变得多长。但是,如果您限制队列并提交的可运行项多于线程数,则任务将被拒绝并抛出异常。因此,我们创建了一个队列,如果非空则始终返回false,以确保创建新线程。然后,如果任务被拒绝,我们只需将其添加到队列中。

    public static ExecutorService newCachedBoundedExecutor(final String name, int minThreads, int maxThreads) {
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(minThreads,
            maxThreads,
            30,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>() {
                @Override
                public boolean offer(Runnable runnable) {
                    if (size() > 1 && size() <= maxThreads) {
                        //create new thread
                        return false;
                    } else {
                        return super.offer(runnable);
                    }
                }
            }, new NumberedThreadFactory(name));

    threadPool.setRejectedExecutionHandler((runnable, executor) -> {
        try {
            executor.getQueue().put(runnable);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });

    return threadPool;
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接