ThreadPoolExecutor
,当线程池达到最大容量并且阻塞队列已满时,submit()
方法会阻塞以避免添加新任务。这是否需要实现自定义的 RejectedExecutionHandler
,或者是否有使用标准 Java 库实现该功能的现成方法?ThreadPoolExecutor
,当线程池达到最大容量并且阻塞队列已满时,submit()
方法会阻塞以避免添加新任务。这是否需要实现自定义的 RejectedExecutionHandler
,或者是否有使用标准 Java 库实现该功能的现成方法?我刚刚找到的可能解决方案:
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command)
throws InterruptedException, RejectedExecutionException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
}
还有其他的解决方案吗?我更喜欢基于RejectedExecutionHandler
的解决方案,因为这似乎是处理这种情况的标准方式。
throw e;
没有在该书中出现的原因。《Java并发编程实践》是正确的! - Timmos您可以使用ThreadPoolExecutor和阻塞队列:
public class ImageManager {
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
private ExecutorService executorService = new ThreadPoolExecutor(numOfThread, numOfThread,
0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);
private int downloadThumbnail(String fileListPath){
executorService.submit(new yourRunnable());
}
}
您应该使用CallerRunsPolicy
,它会在调用线程中执行被拒绝的任务。这样,它就不能向执行器提交任何新任务,直到该任务完成,此时将有一些空闲池线程或进程将重复。
从文档中得知:
被拒绝的任务
当执行器已关闭、同时执行器使用有限的最大线程和工作队列容量,并且已经饱和时,通过execute(java.lang.Runnable)方法提交的新任务将被拒绝。在任一情况下,execute方法都会调用其RejectedExecutionHandler的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)方法。提供了四个预定义的处理程序策略:
- 在默认的ThreadPoolExecutor.AbortPolicy中,处理程序在拒绝时抛出运行时RejectedExecutionException异常。
- 在ThreadPoolExecutor.CallerRunsPolicy中,调用execute方法的线程本身运行任务。这提供了一个简单的反馈控制机制,可以减缓新任务提交的速度。
- 在ThreadPoolExecutor.DiscardPolicy中,无法执行的任务将被简单地丢弃。
- 在ThreadPoolExecutor.DiscardOldestPolicy中,如果执行器没有关闭,则工作队列头部的任务将被删除,然后重试执行(可能再次失败,因此需要重复执行)。
此外,在调用ThreadPoolExecutor构造函数时,请确保使用有界队列,例如ArrayBlockingQueue。否则,没有任何东西会被拒绝。
编辑:针对您的评论,将ArrayBlockingQueue的大小设置为线程池的最大大小,并使用AbortPolicy。
编辑2:好的,我明白你的意思了。那么这样怎么样:重写beforeExecute()
方法来检查getActiveCount()
是否超过getMaximumPoolSize()
,如果超过,则睡眠并重试?
class BlockingQueueHack<T> extends ArrayBlockingQueue<T> {
BlockingQueueHack(int size) {
super(size);
}
public boolean offer(T task) {
try {
this.put(task);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return true;
}
}
ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));
我测试过了,似乎可以正常工作。实现一些超时策略留给读者自己练习。
public final class BlockingExecutor {
private final Executor executor;
private final Semaphore semaphore;
public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
this.semaphore = new Semaphore(queueSize + maxPoolSize);
}
private void execImpl (final Runnable command) throws InterruptedException {
semaphore.acquire();
try {
executor.execute(new Runnable() {
@Override
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
// will never be thrown with an unbounded buffer (LinkedBlockingQueue)
semaphore.release();
throw e;
}
}
public void execute (Runnable command) throws InterruptedException {
execImpl(command);
}
}
/**
* A handler for rejected tasks that will have the caller block until
* space is available.
*/
public static class BlockPolicy implements RejectedExecutionHandler {
/**
* Creates a <tt>BlockPolicy</tt>.
*/
public BlockPolicy() { }
/**
* Puts the Runnable to the blocking queue, effectively blocking
* the delegating thread until space is available.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
try {
e.getQueue().put( r );
}
catch (InterruptedException e1) {
log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
}
}
}
ThreadPoolExecutor
的 javadoc 甚至字面上说:“方法 getQueue() 允许访问工作队列以进行监视和调试。强烈不建议将此方法用于其他任何目的。” 看到这样一个如此广为人知的库中有这样的方法可用,绝对是令人悲哀的。 - Timmos如果您的任务不会永远运行,则使用 < code > CallerRunsPolicy 是有效的,否则您的提交线程将永远停留在 < code > rejectedExecution 中,并且如果您的任务运行时间很长,则这是一个坏主意,因为如果它正在运行任务,则提交线程无法提交任何新任务或执行其他任何操作。
如果这不可接受,则建议在提交任务之前检查执行器的有界队列的大小。如果队列已满,请等待一段时间后再尝试提交。吞吐量会受到影响,但我认为这比许多其他提出的解决方案都要简单,并且您保证不会拒绝任何任务。
java.util.concurrent.Semaphore
和委托Executor.newFixedThreadPool
的行为,有一种非常优雅的方式来解决这个问题。
新的执行器服务只有在有线程执行任务时才会执行新任务。阻塞由许可证数量等于线程数的Semaphore管理。当任务完成时,它会返回一个许可证。public class FixedThreadBlockingExecutorService extends AbstractExecutorService {
private final ExecutorService executor;
private final Semaphore blockExecution;
public FixedThreadBlockingExecutorService(int nTreads) {
this.executor = Executors.newFixedThreadPool(nTreads);
blockExecution = new Semaphore(nTreads);
}
@Override
public void shutdown() {
executor.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return executor.shutdownNow();
}
@Override
public boolean isShutdown() {
return executor.isShutdown();
}
@Override
public boolean isTerminated() {
return executor.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executor.awaitTermination(timeout, unit);
}
@Override
public void execute(Runnable command) {
blockExecution.acquireUninterruptibly();
executor.execute(() -> {
try {
command.run();
} finally {
blockExecution.release();
}
});
}
您可以像这样使用自定义的RejectedExecutionHandler:
ThreadPoolExecutor tp= new ThreadPoolExecutor(core_size, // core size
max_handlers, // max size
timeout_in_seconds, // idle timeout
TimeUnit.SECONDS, queue, new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// This will block if the queue is full
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
System.err.println(e.getMessage());
}
}
});
public class BlockWhenQueueFull implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// The pool is full. Wait, then try again.
try {
long waitMs = 250;
Thread.sleep(waitMs);
} catch (InterruptedException interruptedException) {}
executor.execute(r);
}
}
executorPool = new ThreadPoolExecutor(1, 1, 10,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new BlockWhenQueueFull());