我无法使用shutdown()
和awaitTermination()
,因为在等待期间有可能向ThreadPoolExecutor添加新任务。
因此,我正在寻找一种方法,在不阻止添加新任务的情况下等待ThreadPoolExecutor清空其队列并完成所有任务。
如果有区别的话,这是用于Android的。
谢谢
更新:许多周之后重新审视此问题后,我发现修改后的CountDownLatch在这种情况下更适合我。我将保留已标记的答案,因为它更适用于我的要求。
我无法使用shutdown()
和awaitTermination()
,因为在等待期间有可能向ThreadPoolExecutor添加新任务。
因此,我正在寻找一种方法,在不阻止添加新任务的情况下等待ThreadPoolExecutor清空其队列并完成所有任务。
如果有区别的话,这是用于Android的。
谢谢
更新:许多周之后重新审视此问题后,我发现修改后的CountDownLatch在这种情况下更适合我。我将保留已标记的答案,因为它更适用于我的要求。
ExecutorService.submit(Runnable)
方法。调用这个方法会返回一个Future
对象,可以将其放入一个Collection
中,并在主线程中迭代它们,对于每一个迭代元素,调用Future.get()
方法。这将导致主线程暂停执行直到ExecutorService
处理完所有的Runnable
任务。Collection<Future<?>> futures = new LinkedList<Future<?>>();
futures.add(executorService.submit(myRunnable));
for (Future<?> future:futures) {
future.get();
}
Executors.callable()
方法,将Runnable
转换为Callable
非常简单。 - andersoj我的场景是一个网络爬虫,用于从网站中获取一些信息并进行处理。由于在同一时间内可以加载许多页面,因此使用了ThreadPoolExecutor以加快进程速度。因为爬虫会跟随每个页面中的超链接创建新任务,所以新任务将被创建到现有任务中。问题仍然存在:主线程不知道何时所有任务都已完成,才能开始处理结果。我使用了一种简单的方法来解决这个问题。虽然不太优雅,但在我的情况下起作用:
while (executor.getTaskCount()!=executor.getCompletedTaskCount()){
System.err.println("count="+executor.getTaskCount()+","+executor.getCompletedTaskCount());
Thread.sleep(5000);
}
executor.shutdown();
executor.awaitTermination(60, TimeUnit.SECONDS);
也许您正在寻找一个CompletionService来管理任务批处理,还可以参考this answer。
我认为您需要澄清您的问题,因为其中存在一个隐含的无限条件......在某个时刻,您必须决定关闭执行器,此时它将不再接受任何任务。您的问题似乎暗示着您希望等待直到您知道没有更多的任务将被提交,这只有在您自己的应用程序代码中才能知道。
以下答案将允许您平稳地过渡到一个新的TPE(出于任何原因),完成所有当前提交的任务,并且不会拒绝将新任务分配给新的TPE。它可能回答了您的问题。@Thilo的答案也可能。
假设您已经在某个地方定义了一个可见的TPE:
AtomicReference<ThreadPoolExecutor> publiclyAvailableTPE = ...;
您可以将TPE交换例程编写如下。它也可以使用同步方法编写,但我认为这样更简单:
void rotateTPE()
{
ThreadPoolExecutor newTPE = createNewTPE();
// atomic swap with publicly-visible TPE
ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(newTPE);
oldTPE.shutdown();
// and if you want this method to block awaiting completion of old tasks in
// the previously visible TPE
oldTPE.awaitTermination();
}
或者,如果你真的想要杀死线程池,那么提交任务的一方将需要在某个时候处理被拒绝的任务,并且你可以使用 null
作为新的 TPE:
void killTPE()
{
ThreadPoolExecutor oldTPE = publiclyAvailableTPE.getAndSet(null);
oldTPE.shutdown();
// and if you want this method to block awaiting completion of old tasks in
// the previously visible TPE
oldTPE.awaitTermination();
}
这可能会导致上游问题,调用者需要知道如何处理 null
。
您还可以使用一个虚拟的 TPE 来替换,它只是拒绝每个新执行,但这相当于在 TPE 上调用 shutdown()
。
shutdown
,请按照以下方法操作:
迭代所有从ExecutorService
的submit方法返回的Future
任务,并使用阻塞调用get()
检查Future
对象的状态,如Tim Bender
所建议的那样。
使用以下其中之一:
ExecutorService
上使用invokeAllExecutors
的ForkJoinPool或newWorkStealingPool(自Java 8以来)invokeAll()
在执行器服务上也可以实现与 CountDownLatch
相同的目的。
相关 SE 问题:
Runner runner = Runner.runner(10);
runner.runIn(2, SECONDS, runnable);
runner.run(runnable); // each of this runnables could submit more tasks
runner.waitTillDone(); // blocks until all tasks are finished (or failed)
// and now reuse it
runner.runIn(500, MILLISECONDS, callable);
runner.waitTillDone();
runner.shutdown();
要使用它,请将此Gradle/Maven依赖项添加到您的项目中:'com.github.matejtymes:javafixes:1.0'
有关更多详细信息,请查看这里:https://github.com/MatejTymes/JavaFixes 或这里:http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html
while (executor.getThreadPoolExecutor().getActiveCount() != 0 || !executor.getThreadPoolExecutor().getQueue().isEmpty()){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}