Java:在特定队列大小后阻止提交的ExecutorService

94

我正在尝试编写一个解决方案,其中单个线程会生成I/O密集型任务,可以并行执行。每个任务都具有重要的内存数据。因此,我希望能够限制在某一时刻挂起的任务数量。

如果我像这样创建ThreadPoolExecutor:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));

当队列填满并且所有线程都已经忙碌时,executor.submit(callable)会抛出RejectedExecutionException异常。

我该如何使executor.submit(callable)在队列已满并且所有线程都已忙碌时阻塞?

编辑: 我尝试了这个方法:

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

它达到了我想要的效果,但方式不够优雅(被拒绝的线程在调用线程中运行,这会阻塞调用线程以提交更多内容)。

编辑:(提问后5年)

对于任何阅读此问题及其答案的人,请不要将被接受的答案视为唯一正确的解决方案。请仔细阅读所有答案和评论。


1
我以前使用过信号量来做这件事,就像@axtavt链接的非常相似的问题的答案中所示。 - Stephen Denne
我认为你使用CallerRunsPolicy的解决方案是完美的。你为什么会称其为不优雅呢? - TomWolk
2
@TomWolk 首先,当调用线程也在执行任务时,您可以并行执行比 numWorkerThreads 多一个任务。但更重要的问题是,如果调用线程获取了一个长时间运行的任务,则其他线程可能会闲置等待下一个任务。 - Tahir Akhtar
2
@TahirAkhtar,没错;队列应该足够长,这样当调用者必须自己执行任务时,它就不会耗尽。但我认为,如果可以使用一个以上的线程(即调用者线程)来执行任务,那么这是一个优势。如果调用者只是阻塞,那么调用者的线程将处于空闲状态。我使用了CallerRunsPolicy,并将队列容量设置为线程池容量的三倍,效果非常好。与此解决方案相比,我认为修改框架是过度设计。 - TomWolk
1
@TomWalk +1好观点。另一个区别似乎是,如果任务被从队列中拒绝并由调用线程运行,则调用线程将开始无序处理请求,因为它没有等待在队列中的顺序。当然,如果您已经选择使用线程,则必须正确处理任何依赖关系,但仅需牢记此事。 - rimsky
显示剩余2条评论
8个回答

66

我也做过类似的事情。诀窍在于创建一个阻塞队列,其中 offer() 方法实际上是 put() 方法(你可以使用任何基本的 BlockingQueue 实现)。

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}

请注意,这仅适用于线程池中 corePoolSize==maxPoolSize 的情况,因此请小心使用(请参见注释)。


2
或者你可以扩展SynchronousQueue以防止缓冲,只允许直接交付。 - brendon
优雅而直接地解决了问题。offer() 变成了 put(),put() 的意思是“……等待空间可用”。 - Trenton
7
我认为这不是一个好主意,因为它会改变Offer方法的协议。Offer方法应该是一个非阻塞调用。 - Mingjiang Shi
9
我不同意-这会改变ThreadPoolExecutor.execute的行为,使得如果你的corePoolSize小于maxPoolSize,ThreadPoolExecutor逻辑永远不会在核心线程数之外添加额外的工作线程。 - Krease
5
为了澄清,您的解决方案仅在保持 corePoolSize==maxPoolSize 约束条件时有效。否则,它将不再让 ThreadPoolExecutor 具有所设计的行为。我正在寻找一种没有此限制的解决方案;请查看下面的我的备选答案,以获取我们最终采纳的方法。 - Krease
显示剩余2条评论

17

目前被接受的答案有一个可能存在重大问题——它会改变ThreadPoolExecutor.execute的行为,导致如果您的corePoolSize < maxPoolSize,ThreadPoolExecutor逻辑永远不会在核心线程池之外添加额外的工作线程。

引用自ThreadPoolExecutor.execute(Runnable):

    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);

具体来说,最后一个“else”块永远不会被执行。

更好的选择是采用与 OP 已经在做的类似方式 - 使用RejectedExecutionHandler来执行相同的put逻辑:

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    try {
        if (!executor.isShutdown()) {
            executor.getQueue().put(r);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
    }
}

如评论所指出的那样(参见这个答案),这种方法有一些需要注意的地方:

  1. 如果corePoolSize==0,则存在一种竞争条件,导致任务在可见之前线程池中的所有线程可能都会死亡。
  2. 使用包装队列任务的实现(不适用于ThreadPoolExecutor)将导致问题,除非处理程序也以相同的方式进行包装。

牢记这些要点,此解决方案将适用于大多数典型的ThreadPoolExecutors,并且将正确处理corePoolSize < maxPoolSize的情况。


给那位点了踩的人 - 你能提供一些见解吗?这个答案有什么不正确/误导/危险的地方吗?我想有机会解决你的顾虑。 - Krease
2
我没有给您的帖子点踩,但是这个想法似乎非常糟糕。参考链接 - vanOekel
@vanOekel - 感谢提供链接 - 这个答案提出了一些有效的情况,如果使用这种方法应该知道,但在我看来并不是一个“非常糟糕的想法” - 它仍然解决了当前接受答案中存在的问题。我已经更新了我的答案,并加入了这些注意事项。 - Krease
Java 8 执行 execute() 的实现会处理这个条件: 如果一个任务可以成功排队,那么我们仍然需要仔细检查是否应该添加线程(因为自上次检查以来已有线程死亡),或者池在进入此方法后关闭。因此,我们重新检查状态,如果必要的话,在停止时回滚排队,或者如果没有线程,则启动新线程。否则,如果 (workerCountOf(recheck) == 0),则添加工作线程(addWorker(null, false))。 - madcolonel10
我喜欢这些,似乎比创建自己的LinkedBlockingQueue实现更清晰。但是...我已经尝试过了,即使我将maxPoolSize设置为五,它仍然会旋转八个线程。 - Krzysztof Tkacz
显示剩余2条评论

16
以下是我解决此问题的方法:
(注意:此解决方案会阻止提交Callable的线程,因此可以防止抛出RejectedExecutionException)
格式未变,保留HTML标签。
public class BoundedExecutor extends ThreadPoolExecutor{

    private final Semaphore semaphore;

    public BoundedExecutor(int bound) {
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        semaphore = new Semaphore(bound);
    }

    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{

        semaphore.acquire();            
        return submit(task);                    
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        semaphore.release();
    }
}

1
我认为这对于 corePoolSize < maxPoolSize 的情况效果不佳... :| - rogerdpack
1
它适用于 corePoolSize < maxPoolSize 的情况。在这些情况下,信号量将可用,但不会有线程,并且 SynchronousQueue 将返回 false。然后,ThreadPoolExecutor 将启动一个新线程。这种解决方案的问题是它存在竞态条件。在 semaphore.release() 之后,但在线程完成 execute 之前,submit() 将获取信号量许可。如果 super.submit() 在 execute() 完成之前运行,则作业将被拒绝。 - Luís Guilherme
@LuísGuilherme 但 semaphore.release() 方法不会在线程执行结束之前被调用。因为该调用是在 afterExecute(...) 方法中完成的。我是否在您所描述的情况中忽略了什么? - cvacca
2
afterExecute被同一个运行任务的线程调用,因此任务还没有完成。自己进行测试,实现该解决方案,并向执行程序投入大量工作,如果工作被拒绝则抛出异常。您会发现这确实存在竞争条件,并且很容易重现。 - Luís Guilherme
2
前往ThreadPoolExecutor并检查runWorker(Worker w)方法。您会发现,在afterExecute完成后,包括解锁工作程序和增加已完成任务数的操作。因此,您允许任务进入(通过释放信号量),而没有带宽来处理它们(通过调用processWorkerExit)。 - Luís Guilherme

5
我知道这是一个旧问题,但我遇到了类似的问题:创建新任务非常快,如果存在太多未完成的任务,则会出现OutOfMemoryError。
在我的情况下,我提交了可调用对象(Callables),因此需要结果,因此需要存储通过executor.submit()返回的所有Futures。我的解决方案是将Futures放入具有最大大小的BlockingQueue中。一旦队列已满,将不再生成更多任务,直到某些任务完成(从队列中删除元素)。伪代码如下:
final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads);
final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize);
try {   
    Thread taskGenerator = new Thread() {
        @Override
        public void run() {
            while (reader.hasNext) {
                Callable task = generateTask(reader.next());
                Future future = executor.submit(task);
                try {
                    // if queue is full blocks until a task
                    // is completed and hence no future tasks are submitted.
                    futures.put(future);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();         
                }
            }
        executor.shutdown();
        }
    }
    taskGenerator.start();
    
    // read from queue as long as task are being generated
    // or while Queue has elements in it
    while (taskGenerator.isAlive()
                    || !futures.isEmpty()) {
        Future future = futures.take();
        // do something
    }
} catch (InterruptedException ex) {
    Thread.currentThread().interrupt();     
} catch (ExecutionException ex) {
    throw new MyException(ex);
} finally {
    executor.shutdownNow();
}

复合未来是用来做什么的? - T Ravi Theja
这是变量的原始名称,我在这个例子中没有一致地“重命名”它。 - beginner_

5

如果你正在使用 Spring Integration,那么使用 CallerBlocksPolicy 类如何呢?

该类实现了 RejectedExecutionHandler 接口,它是无法由 ThreadPoolExecutor 执行的任务的处理程序。

你可以像这样使用此策略。

executor.setRejectedExecutionHandler(new CallerBlocksPolicy());
CallerBlocksPolicyCallerRunsPolicy的主要区别在于它们是阻塞调用线程还是在调用线程中运行任务。
请参考这个代码

1
看起来是个不错的选择。如果它在单独的实用库中,那么使用起来会更容易。 - Tahir Akhtar

3

我曾经遇到过类似的问题,我通过使用ThreadPoolExecutorbeforeExecute/afterExecute钩子来实现:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Blocks current task execution if there is not enough resources for it.
 * Maximum task count usage controlled by maxTaskCount property.
 */
public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {

    private final ReentrantLock taskLock = new ReentrantLock();
    private final Condition unpaused = taskLock.newCondition();
    private final int maxTaskCount;

    private volatile int currentTaskCount;

    public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, int maxTaskCount) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.maxTaskCount = maxTaskCount;
    }

    /**
     * Executes task if there is enough system resources for it. Otherwise
     * waits.
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        taskLock.lock();
        try {
            // Spin while we will not have enough capacity for this job
            while (maxTaskCount < currentTaskCount) {
                try {
                    unpaused.await();
                } catch (InterruptedException e) {
                    t.interrupt();
                }
            }
            currentTaskCount++;
        } finally {
            taskLock.unlock();
        }
    }

    /**
     * Signalling that one more task is welcome
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        taskLock.lock();
        try {
            currentTaskCount--;
            unpaused.signalAll();
        } finally {
            taskLock.unlock();
        }
    }
}

这对您来说应该足够好了。顺便说一句,原始实现是基于任务大小的,因为一个任务可能比另一个大100倍,提交两个巨大的任务会导致问题,但运行一个大任务和很多小任务就没问题了。如果您的I/O密集型任务大小大致相同,可以使用这个类,否则请告诉我,我会发布基于大小的实现。
P.S. 您会想要查看 ThreadPoolExecutor 的 javadoc。这是 Doug Lea 关于如何轻松自定义它的非常好的用户指南。

1
我在想,当一个线程在 beforeExecute() 中持有锁并且看到 maxTaskCount < currentTaskCount 并开始等待 unpaused 条件时,会发生什么。同时,另一个线程尝试在 afterExecute() 中获取锁来通知任务完成。这不会导致死锁吗? - Tahir Akhtar
1
我还注意到,当队列已满时,这种解决方案不会阻止提交任务的线程。因此,RejectedExecutionException 仍然是可能发生的。 - Tahir Akhtar
1
ReentrantLock/Condition类的语义与synchronized/wait和notify提供的相似。当调用条件等待方法时,锁将被释放,因此不会发生死锁。 - Petro Semeniuk
这个ExecutorService在提交任务时会阻塞任务,而不会阻塞调用者线程。任务只是被提交了,当系统有足够的资源时,它将异步地被处理。 - Petro Semeniuk
这个使用afterExecute的解决方案以及其他任何解决方案都受到@Luís Guilherme在https://dev59.com/zm855IYBdhLWcg3wIAga#24420823上对另一个答案的评论中描述的竞争条件的影响。 - Tony

3

我已经实现了一个解决方案,遵循装饰器模式并使用信号量来控制执行任务的数量。您可以将其与任何Executor一起使用,并:

  • 指定正在进行的任务的最大数量
  • 指定等待任务执行许可的最长时间(如果超时并且未获得许可,则会抛出RejectedExecutionException
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;

import javax.annotation.Nonnull;

public class BlockingOnFullQueueExecutorDecorator implements Executor {

    private static final class PermitReleasingDecorator implements Runnable {

        @Nonnull
        private final Runnable delegate;

        @Nonnull
        private final Semaphore semaphore;

        private PermitReleasingDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) {
            this.delegate = task;
            this.semaphore = semaphoreToRelease;
        }

        @Override
        public void run() {
            try {
                this.delegate.run();
            }
            finally {
                // however execution goes, release permit for next task
                this.semaphore.release();
            }
        }

        @Override
        public final String toString() {
            return String.format("%s[delegate='%s']", getClass().getSimpleName(), this.delegate);
        }
    }

    @Nonnull
    private final Semaphore taskLimit;

    @Nonnull
    private final Duration timeout;

    @Nonnull
    private final Executor delegate;

    public BlockingOnFullQueueExecutorDecorator(@Nonnull final Executor executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) {
        this.delegate = Objects.requireNonNull(executor, "'executor' must not be null");
        if (maximumTaskNumber < 1) {
            throw new IllegalArgumentException(String.format("At least one task must be permitted, not '%d'", maximumTaskNumber));
        }
        this.timeout = Objects.requireNonNull(maximumTimeout, "'maximumTimeout' must not be null");
        if (this.timeout.isNegative()) {
            throw new IllegalArgumentException("'maximumTimeout' must not be negative");
        }
        this.taskLimit = new Semaphore(maximumTaskNumber);
    }

    @Override
    public final void execute(final Runnable command) {
        Objects.requireNonNull(command, "'command' must not be null");
        try {
            // attempt to acquire permit for task execution
            if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) {
                throw new RejectedExecutionException(String.format("Executor '%s' busy", this.delegate));
            }
        }
        catch (final InterruptedException e) {
            // restore interrupt status
            Thread.currentThread().interrupt();
            throw new IllegalStateException(e);
        }

        this.delegate.execute(new PermitReleasingDecorator(command, this.taskLimit));
    }

    @Override
    public final String toString() {
        return String.format("%s[availablePermits='%s',timeout='%s',delegate='%s']", getClass().getSimpleName(), this.taskLimit.availablePermits(),
                this.timeout, this.delegate);
    }
}

1

我认为只需要使用ArrayBlockingQueue而不是LinkedBlockingQueue就可以了。

别理我...那完全是错的。 ThreadPoolExecutor 调用的是 Queue#offer 而不是 put,后者才会产生你需要的效果。

你可以扩展 ThreadPoolExecutor 并提供一个实现 execute(Runnable) 的方法,该方法调用 put 代替 offer

很抱歉,这似乎并不是完全令人满意的答案。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接