使用Java ExecutorService,如何完成正在运行的任务但停止等待任务的处理?

15

我正在使用ExecutorService(ThreadPoolExecutor)来运行(和排队)大量任务。我试图编写一些尽可能优雅的关闭代码。

ExecutorService有两种关闭方式:

  1. 我可以调用ExecutorService.shutdown(),然后调用ExecutorService.awaitTermination(...)
  2. 我可以调用ExecutorService.shutdownNow()

根据JavaDoc,shutdown命令:

Initiates an orderly shutdown in which previously submitted
tasks are executed, but no new tasks will be accepted.

而且还有shutdownNow命令:

Attempts to stop all actively executing tasks, halts the
processing of waiting tasks, and returns a list of the tasks that were
awaiting execution.

我希望介于这两种选项之间。

我想调用一个命令,它:
  a. 完成当前活动的任务或任务(例如shutdown)。
  b. 停止等待任务的处理(例如shutdownNow)。

例如:假设我有一个具有3个线程的ThreadPoolExecutor。 它当前在队列中有50个任务,其中前3个正在运行。 我想允许这3个活动任务完成,但我不希望剩下的47个任务开始。

我认为我可以通过保持Future对象列表的方式以这种方式关闭ExecutorService,然后在所有这些对象上调用cancel。 但由于从多个线程向此ExecutorService提交任务,因此没有干净的方法来实现这一点。

我真的希望我错过了一些明显的东西,或者有一种清洁的方法来做到这一点。

谢谢任何帮助。


4
+1 很难理解为什么 ExecutorService 没有包含这样的功能。有时候,语言/库的开发者似乎从未编写过任何多线程应用程序。清空队列、设置一些“已中断”的标志(或推送自杀请求),然后等待当前可运行任务完成,似乎是一个合理且常见的要求。 - Martin James
这似乎是一个非常合理的请求。既然我订阅了concurrency-interest邮件列表,我会在那里提问。 - user949300
我得到的建议与不可信和忽必烈·汗的建议非常相似。要么检查isShutdown(),要么调用shutdown()并清除(或排空)队列。 - user949300
3个回答

11

我最近遇到了这个问题。可能有更加优雅的方法,但我的解决方案是先调用 shutdown(),然后取出由 ThreadPoolExecutor 使用的 BlockingQueue 并对其调用 clear()(或将其排空到另一个Collection中进行存储)。最后,调用 awaitTermination() 允许线程池完成当前正在处理的任务。

例如:

public static void shutdownPool(boolean awaitTermination) throws InterruptedException {

    //call shutdown to prevent new tasks from being submitted
    executor.shutdown();

    //get a reference to the Queue
    final BlockingQueue<Runnable> blockingQueue = executor.getQueue();

    //clear the Queue
    blockingQueue.clear();
    //or else copy its contents here with a while loop and remove()

    //wait for active tasks to be completed
    if (awaitTermination) {
        executor.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }
}

这种方法将在包装ThreadPoolExecutor,并带有引用executor的调度类中实现。

需要注意来自ThreadPoolExecutor.getQueue() Javadoc的以下重要内容:

访问任务队列主要用于调试和监视。该队列可能正在被使用。检索任务队列不会防止已排队的任务执行。

这突出了一个事实,即在您清空它时,可以从BlockingQueue中轮询附加任务。但是,根据该接口文档,所有BlockingQueue实现都是线程安全的,因此这不应该会引起问题。


1
我会尝试一下。BlockingQueue在几乎每个操作中都使用ReentrantLock,所以我不确定是否需要synchronized - Jeff Goldberg
1
另外需要注意的是,在您的示例代码中,应该在清空队列之前调用executor.shutdown();。否则存在另一个线程可能会在您清空队列但尚未关闭ExecutorService之前添加任务的风险。由于shutdown仅防止添加新任务,因此首先调用它是安全的。 - Jeff Goldberg
@JeffGoldberg 我知道已经过去了一年半了,但是我又看到了这个答案,并意识到同步块实际上是不必要的,因为所有的BlockingQueue都被记录为是线程安全的。不仅如此,我还错误地使用了synchronized,而它也不会起到任何作用。祝一切顺利。 - Paul Bellora
你正在获取对“final”变量的引用并清除它,而不是使用executor.getQueue().clear()。你获取引用的原因是什么?这个引用为什么是final的? - kommradHomer
@kommradHomer 不,那只是风格问题。executor.getQueue().clear() 是等效的,这可能是我实际代码中使用的方式。 - Paul Bellora
显示剩余7条评论

5
您需要使用的是shutdownNow()方法。您错过了第一个单词Attempts和整个第二段的javadoc

除了尽最大努力试图停止正在执行任务的处理之外,没有其他保证。例如,典型的实现将通过Thread.interrupt()进行取消,因此任何无法响应中断的任务可能永远不会终止。

因此,只有那些定期检查Thread#isInterrupted()的任务(例如在while (!Thread.currentThread().isInterrupted())循环中)将被终止。但是如果您的任务没有在检查这个方面,它将继续运行。

我对这种方法的担忧是,有时我确实想要中断活动任务。它们目前会检查中断状态并正确处理InterruptedException,因为在紧急情况下(或者在每日周期结束时),我需要立即关闭所有东西。 - Jeff Goldberg
在中断状态旁边添加一个额外的检查,以确定是否存在紧急状态的方法。 - BalusC
除了 Thread.isInterrupted() 之外,还有其他的中断点,包括 Thread.sleep()Thread.join()Object.wait() - John Haager
@John:是的,指的是那些抛出“InterruptedException”的异常。但在执行器任务中使用它们非常罕见。 - BalusC
@BalusC:我认为这是一个有效且好的答案,但我不同意在执行器任务中很少发现InterruptedException情况。我的任务肯定会处理它们——有很多线程正在等待第三方活动以释放其他线程的资源。事实上,我不明白为什么你不会总是期望这种行为。InterruptedException似乎特别设计用于同时由执行器运行多个线程的情况。 - Jeff Goldberg
也许是因为我自己从未在真实世界的应用中使用过或看到过它们。但这可能是因为我主要只开发网络应用程序。 - BalusC

5
您可以在每个提交的任务周围添加一些额外的逻辑。
wrapper = new Runnable()
    public void run()
        if(executorService.isShutdown())
            throw new Error("shutdown");
        task.run();

executorService.submit(wrapper);

额外检查的开销可以忽略不计。在执行器被关闭后,包装器仍将被执行,但原始任务不会执行。


1
这是一个非常好的想法。为了避免在代码中包装每个单独的Runnable或Callable,您可以扩展ThreadPoolExecutor(或您选择的ExecutorService),覆盖重载的newTaskFor方法,并使用特殊的“AbortOnShutdownCallable/Runnable”包装所有传入的Callable/Runnable任务。如果忽必烈的“清除阻塞队列”方法最终不起作用,我会尝试这个方法。但无论如何+1。 - Jeff Goldberg

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接