带有有界队列的Java线程池

37

我正在使用 java.util.concurrent 库中的 Executors 类来创建一个固定大小的线程池,用于运行 Web 服务器的请求处理程序:

static ExecutorService  newFixedThreadPool(int nThreads) 

并且描述如下:

创建一个线程池,该线程池重用一组固定的线程,这些线程从共享的无界队列中运行。

然而,我正在寻找一个线程池实现,它将做完全相同的事情,只不过使用一个有界队列。是否有这样的实现?或者我需要为固定线程池实现自己的包装器?

4个回答

47
你想要做的是创建自己的ExecutorService,可能使用ThreadPoolExecutor。ThreadPoolExecutor有一个构造函数,它接受一个BlockingQueue,为了获得一个有界队列,你可以使用例如正确构造的ArrayBlockingQueue。你还可以包括一个RejectedExecutionHandler来确定当队列已满时该怎么做,或者保留对阻塞队列的引用并使用offer方法。
这里有一个小例子:
BlockingQueue<Runnable> linkedBlockingDeque = new LinkedBlockingDeque<Runnable>(
    100);
ExecutorService executorService = new ThreadPoolExecutor(1, 10, 30,
    TimeUnit.SECONDS, linkedBlockingDeque,
    new ThreadPoolExecutor.CallerRunsPolicy());

5
你知道为什么没有一个“PostingBlocksPolicy”可以在任务“可以”被发布之前进行阻塞吗?我希望确保工作(最终)能够完成,所以丢弃或中止策略都不可行,而CallerRuns也不适用,因为我使用(单线程)线程池的整个目的是确保工作在特定的单个线程上完成。 - bacar
1
实际上并不存在这种情况,而且如果你要实现一个的话,可能会有些“尴尬”,因为调用者永远不会控制等待。然而,你可以直接调用linkedBlockingDequeue.offer(x)并对结果采取一些操作。 while( !linkedBlockingDequa.offer( myRunnable ) ) { // 等待,或执行某些操作,或抛出异常 } - lscoughlin
如果您有2个阻塞的任务和线程池中的两个线程,如果将队列大小设置为5并添加一个任务,则该任务将需要很长时间才能执行,或者如果2个线程挂起,则可能永远无法执行。 - Alex Punnen
@AlexPunen 是的。那又怎样?有办法管理这种情况,但这超出了所问问题的范围。如果你自己遇到了这个问题,请自己发布问题,我相信SO会提供帮助的 :P - lscoughlin

7
创建一个ThreadPoolexecutor并在其中传递适当的BlockingQueue实现。例如,您可以在ThreadPoolExecutor构造函数中传递ArrayBlockingQueue以获得所需的效果。

6
我用一个Semaphore来解决这个问题。我使用它来限制向ExecutorService提交的任务数量。

例如:

int threadCount = 10;
ExecutorService consumerPool = Executors.newFixedThreadPool(threadCount);

// set the permit count greater than thread count so that we 
// build up a limited buffer of waiting consumers
Semaphore semaphore = new Semaphore(threadCount * 100); 

for (int i = 0; i < 1000000; ++i) {
    semaphore.acquire(); // this might block waiting for a permit 
    Runnable consumer = () -> {
       try {
          doSomeWork(i);
       } finally {
          semaphore.release(); // release a permit 
       }
    };
    consumerPool.submit(consumer);
}

1
这个解决方案可能会导致许可泄漏。例如,如果未来被取消执行,信号量将永远不会被释放,一个许可证将永久丢失。如果任务被 ExecutorService 拒绝,同样的问题也会发生。 - Ildar Faizov
如果将释放操作放在消费者之外会怎样呢?比如你这样做:CompletableFuture.supplyAsync(() -> doSomeWork(i), consumerPool).whenCompleteAsync((i, err)->semaphore.release(), consumerPool) - JSelser

4

当您创建ThreadPoolExecutor时,可以为其提供有界的BlockingQueue和RejectedExecutionHandler,以便在达到限制时控制发生的情况。默认行为是抛出RejectedExecutionException异常。

您还可以定义自己的线程工厂,以控制线程名称并使它们成为守护线程。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接