如何解决 ThreadPoolExecutor
的限制,即队列需要被限定并填满后才能启动更多线程。
我相信我已经找到了一个比较优雅(也许有点取巧)的解决方案来解决 ThreadPoolExecutor
的限制。它涉及到扩展 LinkedBlockingQueue
使其在已经排队一些任务时返回 false
给 queue.offer(...)
。如果当前线程无法跟上排队的任务,则 TPE 将添加额外的线程。如果池中的线程已达到最大值,则将调用 RejectedExecutionHandler
来执行 put(...)
操作将任务放入队列中。
编写一个队列,其中offer(...)
可以返回false
而且put()
从不阻塞,这确实很奇怪,所以这是个取巧的地方。但是这与 TPE 对队列的使用非常契合,所以我认为没有什么问题。
以下是代码:
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
private static final long serialVersionUID = -6903933921423432194L;
@Override
public boolean offer(Runnable e) {
if (size() == 0) {
return super.offer(e);
} else {
return false;
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 , 50 ,
60 , TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
if (executor.isShutdown()) {
throw new RejectedExecutionException(
"Task " + r + " rejected from " + e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});
使用这个机制时,当我将任务提交到队列时,
ThreadPoolExecutor
将会:
- 将线程数扩展到核心大小(这里是1)。
- 将其提供给队列。如果队列为空,它将排队等待现有线程处理。
- 如果队列已经有一个或多个元素,则
offer(...)
将返回false。
- 如果返回false,则将线程池中的线程数量扩展到最大值(这里是50)。
- 如果已经达到最大值,则调用
RejectedExecutionHandler
RejectedExecutionHandler
然后将任务放入队列,以便由FIFO顺序中的第一个可用线程处理。
尽管在上面的示例代码中,队列是无界的,但您也可以将其定义为有界队列。例如,如果将容量设置为1000的LinkedBlockingQueue
,则它将:
- 将线程数扩展到最大值
- 然后排队,直到具有1000个任务的队列已满
- 然后阻止调用者,直到队列中有空间可用为止。
此外,如果您需要在RejectedExecutionHandler
中使用offer(...)
,则可以改用offer(E, long, TimeUnit)
方法,其中超时为Long.MAX_VALUE
。
警告:
如果您希望在执行程序已关闭后添加任务,则可能希望在自定义的RejectedExecutionHandler
中更加智能地抛出RejectedExecutionException
。感谢@RaduToader指出这一点。
编辑:
对此答案的另一个微调可能是询问TPE是否有空闲线程,并仅在有空闲线程时将项目排队。您需要为此创建一个真正的类,并在其上添加ourQueue.setThreadPoolExecutor(tpe);
方法。
然后,您的offer(...)
方法可能如下所示:
- 检查
tpe.getPoolSize() == tpe.getMaximumPoolSize()
,如果是,则只需调用super.offer(...)
。
- 否则,如果
tpe.getPoolSize() > tpe.getActiveCount()
,则调用super.offer(...)
,因为似乎有空闲线程。
- 否则返回
false
以分叉另一个线程。
也许是这样的:
int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
return super.offer(e);
} else {
return false;
}
请注意,TPE上的get方法很昂贵,因为它们访问volatile字段或(在 getActiveCount()
的情况下)锁定TPE并遍历线程列表。此外,这里存在竞争条件,可能会导致任务被不正确地排队或在有空闲线程时另一个线程被分叉。