如何使ThreadPoolExecutor的submit()方法在饱和时阻塞?

113
我希望创建一个 ThreadPoolExecutor,当线程池达到最大容量并且阻塞队列已满时,submit() 方法会阻塞以避免添加新任务。这是否需要实现自定义的 RejectedExecutionHandler,或者是否有使用标准 Java 库实现该功能的现成方法?

2
你想要的东西是否类似于数组阻塞队列的offer()方法? - extraneon
2
@bacar 我不同意。这个问答看起来更有价值(除了它比较老)。 - JasonMArcher
16个回答

48

我刚刚找到的可能解决方案:

public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException, RejectedExecutionException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
            throw e;
        }
    }
}

还有其他的解决方案吗?我更喜欢基于RejectedExecutionHandler的解决方案,因为这似乎是处理这种情况的标准方式。


2
在finally子句中释放信号量和获取信号量之间存在竞态条件吗? - volni
2
如上所述,此实现存在缺陷,因为在任务完成之前释放了信号量。最好使用方法java.util.concurrent.ThreadPoolExecutor#afterExecute(Runnable, Throwable)。 - FelixM
3
@FelixM指出,在java.util.concurrent.ThreadPoolExecutor#runWorker(Worker w)中,使用java.util.concurrent.ThreadPoolExecutor#afterExecute(Runnable, Throwable)无法解决问题,因为在从队列中取下一个任务之前(查看openjdk 1.7.0.6的源代码),在task.run()后立即调用afterExecute。 - Jaan
2
这个答案来自Brian Goetz的《Java并发实践》一书。 - orangepips
14
这个回答并非完全正确,评论也是如此。这段代码确实来自于《Java并发编程实践》,如果考虑到上下文,它是正确的。该书明确地指出:“在这种方法中,使用一个无界队列(...),并将信号量的边界设置为等于池大小加上您想要允许的排队任务数量”。对于无界队列,任务永远不会被拒绝,因此重新抛出异常是完全没有意义的!我相信这也是为什么throw e;没有在该书中出现的原因。《Java并发编程实践》是正确的! - Timmos
显示剩余9条评论

31

您可以使用ThreadPoolExecutor和阻塞队列:

public class ImageManager {
    BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
    RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
    private ExecutorService executorService =  new ThreadPoolExecutor(numOfThread, numOfThread, 
        0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);

    private int downloadThumbnail(String fileListPath){
        executorService.submit(new yourRunnable());
    }
}

61
这会在提交线程上运行被拒绝的任务。从功能上来说,这不符合原帖作者的要求。 - Perception
你如何使用可调用对象来实现这个? - lochi
4
这会在调用线程中运行任务,而不是阻塞并将其放入队列,可能会产生一些负面影响,比如如果多个线程以这种方式调用它,则将运行多于“队列大小”的作业,如果任务花费的时间超出预期,你的“生产”线程可能无法保持执行器繁忙。但在这里运行得很好! - rogerdpack
4
被踩:这个不会在TPE饱和时阻塞。这只是一个替代方案,而不是解决方案。 - Timmos
3
已点赞:这基本符合“TPE设计”和通过给客户端线程溢出任务来自然阻塞它们的设计。这应该涵盖大多数使用情况,但当然不是所有情况,您应该理解其底层操作。 - Mike
显示剩余2条评论

13

您应该使用CallerRunsPolicy,它会在调用线程中执行被拒绝的任务。这样,它就不能向执行器提交任何新任务,直到该任务完成,此时将有一些空闲池线程或进程将重复。

http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html

从文档中得知:

被拒绝的任务

当执行器已关闭、同时执行器使用有限的最大线程和工作队列容量,并且已经饱和时,通过execute(java.lang.Runnable)方法提交的新任务将被拒绝。在任一情况下,execute方法都会调用其RejectedExecutionHandler的RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor)方法。提供了四个预定义的处理程序策略:

  1. 在默认的ThreadPoolExecutor.AbortPolicy中,处理程序在拒绝时抛出运行时RejectedExecutionException异常。
  2. 在ThreadPoolExecutor.CallerRunsPolicy中,调用execute方法的线程本身运行任务。这提供了一个简单的反馈控制机制,可以减缓新任务提交的速度。
  3. 在ThreadPoolExecutor.DiscardPolicy中,无法执行的任务将被简单地丢弃。
  4. 在ThreadPoolExecutor.DiscardOldestPolicy中,如果执行器没有关闭,则工作队列头部的任务将被删除,然后重试执行(可能再次失败,因此需要重复执行)。

此外,在调用ThreadPoolExecutor构造函数时,请确保使用有界队列,例如ArrayBlockingQueue。否则,没有任何东西会被拒绝。

编辑:针对您的评论,将ArrayBlockingQueue的大小设置为线程池的最大大小,并使用AbortPolicy。

编辑2:好的,我明白你的意思了。那么这样怎么样:重写beforeExecute()方法来检查getActiveCount()是否超过getMaximumPoolSize(),如果超过,则睡眠并重试?


4
我希望同时执行的任务数量严格受限于执行器中的线程数(Executor中的线程数),因此我不能允许调用线程自行执行这些任务。 - Fixpoint
1
AbortPolicy会导致调用线程收到RejectedExecutionException,而我需要它只是阻塞。 - Fixpoint
2
我要求阻塞,而不是睡眠和轮询 ;) - Fixpoint
@danben:你是不是指的CallerRunsPolicy - user359996
7
CallerRunPolicy的问题在于,如果您有单线程生产者,并且一个长时间运行的任务被拒绝了(因为线程池中的其他任务已经完成,而长时间运行的任务仍在运行),则通常会出现未使用线程的情况。 - Adam Gent

8
我知道,这是一个hack,但在我看来,这是这里提供的最干净的hack ;-)
因为ThreadPoolExecutor使用阻塞队列“offer”而不是“put”,所以让我们覆盖阻塞队列的“offer”行为:
class BlockingQueueHack<T> extends ArrayBlockingQueue<T> {

    BlockingQueueHack(int size) {
        super(size);
    }

    public boolean offer(T task) {
        try {
            this.put(task);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return true;
    }
}

ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));

我测试过了,似乎可以正常工作。实现一些超时策略留给读者自己练习。


请参见https://dev59.com/zm855IYBdhLWcg3wIAga#4522411以获取此内容的清理版本。我同意,这是最干净的方法。 - Trenton

7
以下类将包装一个ThreadPoolExecutor,并使用Semaphore来阻止工作队列已满时的情况:
public final class BlockingExecutor { 

    private final Executor executor;
    private final Semaphore semaphore;

    public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
        this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
        this.semaphore = new Semaphore(queueSize + maxPoolSize);
    }

    private void execImpl (final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            // will never be thrown with an unbounded buffer (LinkedBlockingQueue)
            semaphore.release();
            throw e;
        }
    }

    public void execute (Runnable command) throws InterruptedException {
        execImpl(command);
    }
}

这个包装类基于Brian Goetz所著的Java并发实践一书中提出的解决方案。该书中的解决方案仅需要两个构造函数参数:一个Executor和一个用于信号量的绑定。这在Fixpoint给出的答案中已经说明了。但是,这种方法存在一个问题:当池线程都很忙,队列已满,但是信号量刚释放了许可证(在finally块中)时,会处于一种状态。在此状态下,新任务可以获取刚刚释放的许可证,但由于任务队列已满而被拒绝。显然,这不是你想要的;你希望在这种情况下阻塞。
为了解决这个问题,我们必须使用一个无限队列,正如JCiP中明确提到的那样。信号量作为保护器,产生虚拟队列大小的效果。这具有副作用,即单位可能包含maxPoolSize + virtualQueueSize + maxPoolSize个任务。为什么呢?因为在finally块中有semaphore.release()语句。如果所有池线程同时调用此语句,则会释放maxPoolSize个许可证,允许相同数量的任务进入单位。如果我们使用有界队列,它仍将被填满,从而导致拒绝任务。现在,因为我们知道这只会在池线程快要完成时发生,所以这不是问题。我们知道池线程不会阻塞,因此很快就会从队列中取出任务。
但你仍然可以使用有界队列。只需确保其大小等于virtualQueueSize + maxPoolSize即可。容量更大是无用的,信号量将阻止更多的项目进入。容量更小会导致拒绝任务的机会增加。例如,假设你想要一个具有maxPoolSize=2和virtualQueueSize=5的有界执行器。然后,使用一个带有7个许可证和实际队列大小为5+2=7的信号量。单位中实际可以存在的任务数量为2+5+2=9。当执行器已满(队列中有5个任务,线程池中有2个任务,因此没有许可证可用)并且所有池线程都释放了它们的许可证时,正好可以由2个任务使用这些许可证。
现在,JCiP中的解决方案有点麻烦,因为它没有强制执行所有这些约束条件(无限队列或具有数学限制的有界队列等)。我认为这只是一个很好的例子,展示了如何基于已经可用的部分构建新的线程安全类,而不是一个成熟、可重用的类。我认为作者的意图不是后者。

5
Hibernate有一个“BlockPolicy”非常简单,可能是你想要的:
请参见:Executors.java
/**
 * A handler for rejected tasks that will have the caller block until
 * space is available.
 */
public static class BlockPolicy implements RejectedExecutionHandler {

    /**
     * Creates a <tt>BlockPolicy</tt>.
     */
    public BlockPolicy() { }

    /**
     * Puts the Runnable to the blocking queue, effectively blocking
     * the delegating thread until space is available.
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        try {
            e.getQueue().put( r );
        }
        catch (InterruptedException e1) {
            log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
        }
    }
}

5
仔细想想,这个主意很糟糕。我不建议你使用它。具体原因请参见此处:https://dev59.com/RXA75IYBdhLWcg3wJFmY#3518588 - Nate Murray
此外,根据 OP 的请求,这不是使用“标准 Java 库”。删除吗? - user359996
4
哇,那太丑了。基本上这个解决方案会干扰 TPE 的内部。ThreadPoolExecutor 的 javadoc 甚至字面上说:“方法 getQueue() 允许访问工作队列以进行监视和调试。强烈不建议将此方法用于其他任何目的。” 看到这样一个如此广为人知的库中有这样的方法可用,绝对是令人悲哀的。 - Timmos
1
com.amazonaws.services.simpleworkflow.flow.worker.BlockCallerPolicy 是相似的。 - Adrian Baker

5
< p > 上述引用自《Java并发实践》的 < code > BoundedExecutor 只有在 Executor 使用无界队列,或者信号量限制不大于队列大小时才能正常工作。由于信号量是提交线程和池中线程之间共享的状态,因此即使队列大小 < bound <= (队列大小 + 池大小),也有可能使执行器饱和。

如果您的任务不会永远运行,则使用 < code > CallerRunsPolicy 是有效的,否则您的提交线程将永远停留在 < code > rejectedExecution 中,并且如果您的任务运行时间很长,则这是一个坏主意,因为如果它正在运行任务,则提交线程无法提交任何新任务或执行其他任何操作。

如果这不可接受,则建议在提交任务之前检查执行器的有界队列的大小。如果队列已满,请等待一段时间后再尝试提交。吞吐量会受到影响,但我认为这比许多其他提出的解决方案都要简单,并且您保证不会拒绝任何任务。


2
我不确定在多线程环境中有多个任务生产者的情况下,在提交之前检查队列长度如何保证没有被拒绝的任务。这听起来并不是线程安全的。 - Tim

0
我相信使用java.util.concurrent.Semaphore和委托Executor.newFixedThreadPool的行为,有一种非常优雅的方式来解决这个问题。 新的执行器服务只有在有线程执行任务时才会执行新任务。阻塞由许可证数量等于线程数的Semaphore管理。当任务完成时,它会返回一个许可证。
public class FixedThreadBlockingExecutorService extends AbstractExecutorService {

private final ExecutorService executor;
private final Semaphore blockExecution;

public FixedThreadBlockingExecutorService(int nTreads) {
    this.executor = Executors.newFixedThreadPool(nTreads);
    blockExecution = new Semaphore(nTreads);
}

@Override
public void shutdown() {
    executor.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
    return executor.shutdownNow();
}

@Override
public boolean isShutdown() {
    return executor.isShutdown();
}

@Override
public boolean isTerminated() {
    return executor.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
    return executor.awaitTermination(timeout, unit);
}

@Override
public void execute(Runnable command) {
    blockExecution.acquireUninterruptibly();
    executor.execute(() -> {
        try {
            command.run();
        } finally {
            blockExecution.release();
        }
    });
}

我实现了《Java并发编程实战》中描述的BoundedExecutor,并发现Semaphore必须使用公平性标志初始化,以确保Semaphore许可证按照请求顺序提供。有关详细信息,请参阅http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html。 - Prahalad Deshpande

0

您可以像这样使用自定义的RejectedExecutionHandler:

ThreadPoolExecutor tp= new ThreadPoolExecutor(core_size, // core size
                max_handlers, // max size 
                timeout_in_seconds, // idle timeout 
                TimeUnit.SECONDS, queue, new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        // This will block if the queue is full
                        try {
                            executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                            System.err.println(e.getMessage());
                        }

                    }
                });

1
getQueue()的文档明确提到,访问任务队列主要用于调试和监控。 - Chadi

0
我不总是喜欢CallerRunsPolicy,特别是它允许被拒绝的任务“跳过队列”并在先前提交的任务之前执行。此外,在调用线程上执行任务可能比等待第一个可用插槽要花费更长的时间。
我使用自定义的RejectedExecutionHandler解决了这个问题,它简单地阻塞调用线程一段时间,然后再次尝试提交任务。
public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}

        executor.execute(r);
    }
}

这个类可以像其他拒绝执行处理程序一样,在线程池执行器中使用,例如:

executorPool = new ThreadPoolExecutor(1, 1, 10,
                                      TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
                                      new BlockWhenQueueFull());

我所看到的唯一缺点是,调用线程可能会被锁定时间略长于严格必要的时间(最多250毫秒)。此外,由于该执行程序实际上是递归调用的,等待线程可用的时间过长(几个小时)可能会导致堆栈溢出。
尽管如此,我个人喜欢这种方法。它紧凑、易于理解并且效果很好。

1
正如你所说:这可能会导致堆栈溢出。这不是我想在生产代码中遇到的问题。 - Harald
每个人都应该做出自己的决定。对于我的工作量来说,这不是问题。任务只需要几秒钟就能完成,而不是需要花费数小时来处理堆栈溢出的情况。此外,同样的情况也适用于几乎所有递归算法。但这是否意味着我们在生产中永远不应该使用任何递归算法呢? - TinkerTank

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接