将任务添加到ThreadPoolExecutor的BlockingQueue中是否可行?

21

针对JavaDoc中ThreadPoolExecutor的说明不明确,无法确定直接向支持执行器的BlockingQueue添加任务是否可以接受。然而文档中提到,调用executor.getQueue()主要用于调试和监控。

我使用自己的BlockingQueue构建了一个ThreadPoolExecutor,并保留了队列的引用,以便直接向其添加任务。由于getQueue()返回相同的队列,因此我假设getQueue()中的警告也适用于通过我的方式获得的后台队列引用。

示例

代码的一般模式如下:

int n = ...; // number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
// ...
while (...) {
    Runnable job = ...;
    queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
    try {
        Thread.sleep(...);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
executor.shutdownNow();

queue.offer() vs executor.execute()

通常使用executor.execute()添加任务,而我上面的例子则是在队列上阻塞而不是使用execute(),因为如果队列已满并拒绝我的任务,它就会立即失败。我也喜欢提交作业与阻塞队列交互;这感觉更像是"纯粹的"生产者-消费者。

直接添加任务到队列的一个影响是,我必须调用prestartAllCoreThreads(),否则没有工作线程正在运行。假设没有与执行器的其他交互,那么没有任何东西来监视队列(检查ThreadPoolExecutor源代码可证实)。这也意味着对于直接入队操作,ThreadPoolExecutor必须额外配置> 0个核心线程,并且不能配置为允许核心线程超时。

tl;dr

给定如下配置的ThreadPoolExecutor

  • 核心线程数> 0
  • 不允许核心线程超时
  • 已预启动核心线程
  • 持有指向支持执行器的BlockingQueue的引用

是否可以直接将任务添加到队列中,而不是调用executor.execute()

Related

这个问题 (生产者/消费者工作队列)类似,但没有特别涉及直接添加到队列的问题。


我保留队列的引用,这样我就可以直接向其中添加任务。但是为什么要这样做呢?为什么不直接将它们提交给执行器呢? - Raedwald
@Raedwald,请看我上面写的:“我上面的例子方法的好处是在队列上阻塞...” - user23987
5个回答

12

如果是我,我更倾向于使用Executor#execute()而不是Queue#offer(),因为我已经在使用java.util.concurrent中的其他所有内容。

你的问题很好,引起了我的兴趣,所以我查看了ThreadPoolExecutor#execute()的源代码:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

我们可以看到execute()函数会在必要时对线程池进行操作,然后调用offer()将任务添加到工作队列。因此,建议使用execute(),不使用它可能(尽管我不确定)导致线程池的非最优操作。但是,我认为使用offer()不会 破坏执行器——似乎任务是使用以下方式从队列中取出的(这也来自ThreadPoolExecutor):

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

这个getTask()方法只是在循环内被调用,因此如果执行器没有关闭,它将阻塞直到一个新任务被添加到队列中(无论来自何处)。

注意:尽管我在此处发布了源代码片段,但我们不能依靠它们来获得确定性的答案 - 我们应该只编写符合API的代码。我们不知道execute()的实现将来会如何更改。


11

一个技巧是实现ArrayBlockingQueue的自定义子类,并重写offer()方法调用你的阻塞版本,这样你仍然可以使用正常的代码路径。

queue = new ArrayBlockingQueue<Runnable>(queueSize) {
  @Override public boolean offer(Runnable runnable) {
    try {
      return offer(runnable, 1, TimeUnit.HOURS);
    } catch(InterruptedException e) {
      // return interrupt status to caller
      Thread.currentThread().interrupt();
    }
    return false;
  }
};

(如您所料,我认为在队列上直接调用offer()作为您的正常代码路径可能是不好的想法。)


4

在实例化时,可以通过指定RejectedExecutionHandler来配置池在队列满时的行为。 ThreadPoolExecutor定义了四个策略作为内部类,包括AbortPolicyDiscardOldestPolicyDiscardPolicy以及我个人最喜欢的CallerRunsPolicy,它在控制线程中运行新作业。

例如:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        nproc, // core size
        nproc, // max size
        60, // idle timeout
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(4096, true), // Fairness = true guarantees FIFO
        new ThreadPoolExecutor.CallerRunsPolicy() ); // If we have to reject a task, run it in the calling thread.

可以使用类似以下内容来实现所需的行为:

public class BlockingPolicy implements RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        executor.getQueue.put(r); // Self contained, no queue reference needed.
    }

在某个时刻,必须访问队列最好的方式是使用独立的RejectedExecutionHandler,避免直接操作池对象作用域的队列造成的任何代码重复或潜在错误。需要注意的是ThreadPoolExecutor中包含的处理程序本身使用 getQueue()


1
我喜欢你提出另一种选择。当时我在处理这个问题时的观点是ThreadPoolExecutor是一个实现细节。我可以使用阻塞队列来模拟生产者/消费者,并在不改变客户端代码的情况下更改工作调度的实现方式。 - user23987

2

如果您使用的队列与内存中的标准LinkedBlockingQueueArrayBlockingQueue实现完全不同,那么这是一个非常重要的问题。

例如,如果您正在使用基于单独持久性子系统(如Redis)的排队机制,在不同的计算机上使用多个生产者来实现生产者-消费者模式,则即使您不想要像OP一样的阻塞offer(),该问题本身也变得相关。

因此,给出的答案是必须调用prestartAllCoreThreads()(或足够的prestartCoreThread()次数),以使工作线程可用并运行,这一点非常重要,值得强调。


0
如果需要的话,我们还可以使用一个停车场来将主要处理与被拒绝的任务分开。
    final CountDownLatch taskCounter = new CountDownLatch(TASKCOUNT);
    final List<Runnable> taskParking = new LinkedList<Runnable>();
    BlockingQueue<Runnable> taskPool = new ArrayBlockingQueue<Runnable>(1);
    RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.err.println(Thread.currentThread().getName() + " -->rejection reported - adding to parking lot " + r);
            taskCounter.countDown();
            taskParking.add(r);
        }
    };
    ThreadPoolExecutor threadPoolExecutor =  new ThreadPoolExecutor(5, 10, 1000, TimeUnit.SECONDS, taskPool, rejectionHandler);
    for(int i=0 ; i<TASKCOUNT; i++){
        //main 
        threadPoolExecutor.submit(getRandomTask());
    }
    taskCounter.await(TASKCOUNT * 5 , TimeUnit.SECONDS);
    System.out.println("Checking the parking lot..." + taskParking);
    while(taskParking.size() > 0){
        Runnable r = taskParking.remove(0);
        System.out.println("Running from parking lot..." + r);
        if(taskParking.size() > LIMIT){
          waitForSometime(...);
        }
        threadPoolExecutor.submit(r);
    }
    threadPoolExecutor.shutdown();

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接