我曾经遇到过类似的问题,我通过使用ThreadPoolExecutor
的beforeExecute/afterExecute
钩子来实现:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {
private final ReentrantLock taskLock = new ReentrantLock();
private final Condition unpaused = taskLock.newCondition();
private final int maxTaskCount;
private volatile int currentTaskCount;
public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, int maxTaskCount) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.maxTaskCount = maxTaskCount;
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
taskLock.lock();
try {
while (maxTaskCount < currentTaskCount) {
try {
unpaused.await();
} catch (InterruptedException e) {
t.interrupt();
}
}
currentTaskCount++;
} finally {
taskLock.unlock();
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
taskLock.lock();
try {
currentTaskCount--;
unpaused.signalAll();
} finally {
taskLock.unlock();
}
}
}
这对您来说应该足够好了。顺便说一句,原始实现是基于任务大小的,因为一个任务可能比另一个大100倍,提交两个巨大的任务会导致问题,但运行一个大任务和很多小任务就没问题了。如果您的I/O密集型任务大小大致相同,可以使用这个类,否则请告诉我,我会发布基于大小的实现。
P.S. 您会想要查看 ThreadPoolExecutor 的 javadoc。这是 Doug Lea 关于如何轻松自定义它的非常好的用户指南。
numWorkerThreads
多一个任务。但更重要的问题是,如果调用线程获取了一个长时间运行的任务,则其他线程可能会闲置等待下一个任务。 - Tahir Akhtar