如何在不关闭执行器的情况下等待线程池中所有任务完成?

72

我无法使用shutdown()awaitTermination(),因为在等待期间有可能向ThreadPoolExecutor添加新任务。

因此,我正在寻找一种方法,在不阻止添加新任务的情况下等待ThreadPoolExecutor清空其队列并完成所有任务。

如果有区别的话,这是用于Android的。

谢谢

更新:许多周之后重新审视此问题后,我发现修改后的CountDownLatch在这种情况下更适合我。我将保留已标记的答案,因为它更适用于我的要求。


如果您可以接受添加新任务,那么如果它永远无法完成会发生什么? - Kylar
2
我并不太担心它永远无法完成,因为如果是这种情况,那么其他事情已经出了大问题。如果所有其他方法都失败了,我可以实现某种超时,但我认为假设它会完成也没关系。我希望在等待期间能够添加新任务,或者换句话说,我希望在调用等待后能够添加新任务。 - cottonBallPaws
我认为这是一个相关的问题:https://dev59.com/-0_Ta4cB1Zd3GeqPBogo。你可能想要查看那里列出的答案... - sjlee
你是否只是想等待异步通知,以便在队列恰好为空时立即关闭它?TPE(更一般地说,线程语义)可能不允许您这样做,即使您可以这样做,也很难从任务提交者的角度进行推理。 - andersoj
请参见https://dev59.com/PnM_5IYBdhLWcg3wvF3J。 - rogerdpack
显示剩余2条评论
7个回答

79
如果你想知道某个任务或一批任务何时完成,可以使用ExecutorService.submit(Runnable)方法。调用这个方法会返回一个Future对象,可以将其放入一个Collection中,并在主线程中迭代它们,对于每一个迭代元素,调用Future.get()方法。这将导致主线程暂停执行直到ExecutorService处理完所有的Runnable任务。
Collection<Future<?>> futures = new LinkedList<Future<?>>();
futures.add(executorService.submit(myRunnable));
for (Future<?> future:futures) {
    future.get();
}

2
+1 那似乎是最好的方法。但必须在应用程序级别完成,也就是在提交任务的地方,而不是在执行器服务级别完成。 - Thilo
12
可以使用+1,或在一批任务上使用invokeAll()来等待完成。请参见我在此处的答案:https://dev59.com/KnA75IYBdhLWcg3wfpGr#3269888 - andersoj
1
如果您愿意将任务更改为“Callable”而不是“Runnable”,那么@andersoj提到的解决方案要简单得多。 - Tim Bender
1
使用Executors.callable()方法,将Runnable转换为Callable非常简单。 - andersoj
1
@littleFluffyKitty,在原始答案中,LinkedList的内存将在堆空间中存在,只要变量在作用域内并继续指向堆中的数据结构。如果您将变量定义为某个方法的局部变量,则它将在该方法的持续时间内存在。我不太清楚您打算如何使用此片段。当然,我是以Java开发人员的身份思考的,而Android不是Java。 - Tim Bender
显示剩余11条评论

8

我的场景是一个网络爬虫,用于从网站中获取一些信息并进行处理。由于在同一时间内可以加载许多页面,因此使用了ThreadPoolExecutor以加快进程速度。因为爬虫会跟随每个页面中的超链接创建新任务,所以新任务将被创建到现有任务中。问题仍然存在:主线程不知道何时所有任务都已完成,才能开始处理结果。我使用了一种简单的方法来解决这个问题。虽然不太优雅,但在我的情况下起作用:

while (executor.getTaskCount()!=executor.getCompletedTaskCount()){
    System.err.println("count="+executor.getTaskCount()+","+executor.getCompletedTaskCount());
    Thread.sleep(5000);
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);

6

你的链接似乎有问题。 - qwertzguy
@qwertzguy:已更换为更加稳定的版本。谢谢。 - Thilo

3

我认为您需要澄清您的问题,因为其中存在一个隐含的无限条件......在某个时刻,您必须决定关闭执行器,此时它将不再接受任何任务。您的问题似乎暗示着您希望等待直到您知道没有更多的任务将被提交,这只有在您自己的应用程序代码中才能知道。

以下答案将允许您平稳地过渡到一个新的TPE(出于任何原因),完成所有当前提交的任务,并且不会拒绝将新任务分配给新的TPE。它可能回答了您的问题。@Thilo的答案也可能。

假设您已经在某个地方定义了一个可见的TPE:

AtomicReference<ThreadPoolExecutor> publiclyAvailableTPE = ...;

您可以将TPE交换例程编写如下。它也可以使用同步方法编写,但我认为这样更简单:

void rotateTPE()
{
   ThreadPoolExecutor newTPE = createNewTPE();
   // atomic swap with publicly-visible TPE
   ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(newTPE);
   oldTPE.shutdown();

   // and if you want this method to block awaiting completion of old tasks in  
   // the previously visible TPE
   oldTPE.awaitTermination();
} 

或者,如果你真的想要杀死线程池,那么提交任务的一方将需要在某个时候处理被拒绝的任务,并且你可以使用 null 作为新的 TPE:

void killTPE()
{
   ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(null);
   oldTPE.shutdown();

   // and if you want this method to block awaiting completion of old tasks in  
   // the previously visible TPE
   oldTPE.awaitTermination();
} 

这可能会导致上游问题,调用者需要知道如何处理 null

您还可以使用一个虚拟的 TPE 来替换,它只是拒绝每个新执行,但这相当于在 TPE 上调用 shutdown()


感谢您抽出时间写下这些内容,我会仔细查看并确定哪种方法最适合。 - cottonBallPaws

1
如果您不想使用shutdown,请按照以下方法操作:
  1. 迭代所有从ExecutorService的submit方法返回的Future任务,并使用阻塞调用get()检查Future对象的状态,如Tim Bender所建议的那样。

  2. 使用以下其中之一:

    1. ExecutorService上使用invokeAll
    2. 使用CountDownLatch
    3. 使用ExecutorsForkJoinPoolnewWorkStealingPool(自Java 8以来)

invokeAll() 在执行器服务上也可以实现与 CountDownLatch 相同的目的。

相关 SE 问题:

如何等待一组线程完成?


0
你可以在Runner类上调用waitTillDone()方法:
Runner runner = Runner.runner(10);

runner.runIn(2, SECONDS, runnable);
runner.run(runnable); // each of this runnables could submit more tasks

runner.waitTillDone(); // blocks until all tasks are finished (or failed)

// and now reuse it

runner.runIn(500, MILLISECONDS, callable);

runner.waitTillDone();
runner.shutdown();

要使用它,请将此Gradle/Maven依赖项添加到您的项目中:'com.github.matejtymes:javafixes:1.0'

有关更多详细信息,请查看这里:https://github.com/MatejTymes/JavaFixes 或这里:http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html


0
请尝试使用以下所示的队列大小和活动任务计数。
 while (executor.getThreadPoolExecutor().getActiveCount() != 0 || !executor.getThreadPoolExecutor().getQueue().isEmpty()){
                     try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
            }
        }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接