使用ExecutorService如何等待所有线程完成?

459

我需要每次执行4个任务,就像这样:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

我该如何在所有任务完成后得到通知?目前,我想不出比设置一些全局任务计数器更好的方法,并在每个任务结束时将其减少,然后监视这个计数器是否变为0;或者获取Future列表并在无限循环中监视它们全部都已完成。有没有更好的解决方案,不涉及无限循环?

谢谢。

27个回答

516

ExecutorService 上,你可以调用 shutdown() 方法,接着调用 awaitTermination() 方法:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}

11
这正是shutdown/awaitTermination的用途所在。 - matt b
39
如果这个任务处理是一次性事件,那么这种模式很好。但如果在同一运行时重复执行,那么不是最优的,因为每次执行时都会重复创建和撤销线程。 - sjlee
49
我正在寻找任何官方文档,证明 Long.MAX_VALUE, TimeUnit.NANOSECONDS 等同于没有超时。 - Sam Harwell
18
我无法相信你必须使用 shutdown 来加入所有当前线程(使用 shutdown 后,就不能再次使用执行器)。建议改用 Future 列表代替。 - rogerdpack
24
请参考java.util.concurrent包下的文档,在“Timing”部分查看:要等待“永远”,您可以使用Long.MAX_VALUE的值。 - beluchin
显示剩余17条评论

198

使用CountDownLatch

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

在任务中使用try/finally语句来保证代码的执行(将代码放在try代码块中,必要时在finally代码块中清理资源)。

latch.countDown();

4
没有4个任务,而是会每次完成“一些任务”中的4个。 - cletus
2
抱歉,我误解了问题。是的,任务数量应该作为CountDownLatch构造函数的参数。 - ChssPly76
7
我认为这个解决方案比其他方案更优雅,它看起来就像是为此目的而制作的,而且简单明了。 - wvdschel
3
如果在开始之前你不知道任务的数量,该怎么办? - cletus
14
如果您知道任务的数量,线程池设置需要根据部署进行配置并且可以重复使用,则不需要使用CountDownLatch。我并不想争辩这种方法是否比您的方法更好。但是,在实际场景中,我发现我确实知道任务的数量,线程池设置需要根据部署进行配置,并且可以重复使用。因此,我通常会通过Spring注入线程池,并将它们设置为原型,只手动关闭它们以等待线程完成似乎不太理想。 - ChssPly76
显示剩余6条评论

102

ExecutorService.invokeAll()可以为您完成此操作。

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);

当您必须逐个启动“4”个线程,分段,然后加入/等待所有4个线程完成时,就会遇到困难。 - rogerdpack
@rogerdpack:我还在学习这个执行器和相关的东西。但是针对你的问题,每次只能同时运行4个线程,不应该将它们作为批处理任务的一部分来执行吗? - Mukul Goel
3
只有在预先知道任务数量的情况下,这种方法才有效。 - Konstantin
6
我认为当“futures”被返回时,任务尚未完成。它们可能会在未来完成,并且您将获得结果的链接。这就是为什么它被称为“Future”。您可以使用Future.get()方法等待任务完成以获取结果。 - AlikElzin-kilaka
13
引用自JavaDocs(链接在答案中):“执行给定的任务,返回一个Future列表,其中包含它们的状态和结果。当所有任务完成时,Future.isDone()对于返回列表中的每个元素都为真。” - Hulk
3
请注意,executorService.invokeAll将等待所有线程完成,但您仍需要调用executorService.shutdown来清理线程池。 - The Gilbert Arenas Dagger

55
您可以使用“未来列表”(Lists of Futures):
List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

那么当你想要将它们全部连接起来时,本质上相当于在每个线程上进行连接(并且额外的好处是可以将子线程中的异常重新抛出到主线程):

for(Future f: this.futures) { f.get(); }

基本上的技巧是逐个调用每个Future的.get(),而不是无限循环调用isDone()(全部或每个)。 这样,只要最后一个线程完成,就保证可以“通过”并越过此块。但需要注意的是,由于.get()调用会重新引发异常,如果其中一个线程死亡,则您可能会在其他线程完成之前从该处引发异常[为避免这种情况,您可以在get调用周围添加catch ExecutionException]。另一个需要注意的是,它会保留对所有线程的引用,因此如果它们具有线程本地变量,则在您越过此块之前它们将不会被收集(尽管如果成为问题,您可能可以通过从ArrayList中删除Future来解决)。 如果您想知道哪个Future“先完成”,可以使用类似https://dev59.com/mHM_5IYBdhLWcg3wfTE0#31885029的东西。


3
使用ExecutorCompletionService.take可以知道哪个任务"先完成": https://dev59.com/b2ct5IYBdhLWcg3wuPu6#11872604 - ToolmakerSteve

50

在Java8中,您可以使用CompletableFuture实现:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

4
这是一个非常优雅的解决方案。 - mattvonb
es.shutdown()之后需要awaitTermination()吗?@AdamSkywalker - Gaurav
@gaurav 当你调用shutdown时,可能有一些任务还没有完成。因此,awaitTermination将阻塞调用线程,直到所有任务都完成。这取决于你是否需要在此线程中等待结果。 - AdamSkywalker
@AdamSkywalker 很棒的回答。如果我不需要等待结果,那么不调用awaitTermination()是有道理的。 - Gaurav
@AdamSkywalker 很棒的回答!!优雅! - nagendra547

28

仅仅是我的个人意见。 为了克服CountDownLatch在事先知道任务数量的要求,你可以使用一种老式的方法,即使用一个简单的Semaphore

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
    taskExecutor.execute(new MyTask());
    numberOfTasks++;
}

try {
    s.aquire(numberOfTasks);
...

在你的任务中,只需像调用latch.countDown();一样调用s.release()即可。


看到这个,我起初想知道如果一些“release”调用发生在“acquire”调用之前是否会有问题,但是阅读了Semaphore文档后,我知道这是可以的。 - ToolmakerSteve

15
有点晚了,但为了完整起见...
不要“等待”所有任务完成,你可以按照好莱坞原则考虑,“别打电话给我,我会打给你”——当我完成后。 我认为这种方式得到的代码更加优雅...
Guava提供了一些有趣的工具来实现这一点。
一个例子:
将ExecutorService包装成ListeningExecutorService:
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

提交可调用对象集合以进行执行 ::
for (Callable<Integer> callable : callables) {
  ListenableFuture<Integer> lf = service.submit(callable);
  // listenableFutures is a collection
  listenableFutures.add(lf)
});

现在是重要的部分:
ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

将回调附加到ListenableFuture,以便在所有未来完成时通知您:

Futures.addCallback(lf, new FutureCallback<List<Integer>> () {
    @Override
    public void onSuccess(List<Integer> result) {
        // do something with all the results
    }

    @Override
    public void onFailure(Throwable t) {
        // log failure
    }
});

这样做的好处是,一旦处理完成,您就可以在一个地方收集所有结果...

更多信息在此处


2
非常干净。即使在Android上也能完美运行。只需要在onSuccess()中使用runOnUiThread()即可。 - DSchmidt

13

Java 5及更高版本中的CyclicBarrier类专为此类事情而设计。


7
酷,我总是记不住这个数据结构的名字。然而,只有在您事先知道将排队的任务数量时才适用。 - ՕլՁՅԿ
是的,你可能认为你可以用当前线程和所有子线程来击中屏障,然后当你通过它时,你就会知道子线程已经完成了... - rogerdpack
实际上这是错误的答案。CyclicBarrier设计用于分段。CountDownLatch设计用于等待事件。 - gstackoverflow

11

这里有两个选项,只是有点困惑哪一个更好。

选项1:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

选项二:

ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
    futures.add(es.submit(task));
}

for(Future<?> future : futures) {
    try {
        future.get();
    }catch(Exception e){
        // do logging and nothing else
    }
}
es.shutdown();

把future.get()放在try catch中是个好主意,对吗?


最好将 future.get 调用计时 future.get(10, TimeUnit.SECONDS); 并捕获 TimeoutException。 - Siddhartha

9
跟以下其中一种方法:
  1. 遍历从 ExecutorServicesubmit 返回的所有 Future 任务,并使用阻塞调用 get()Future 对象上检查状态,如 Kiran 建议的那样。
  2. ExecutorService 上使用 invokeAll()
  3. CountDownLatch
  4. ForkJoinPoolExecutors.html#newWorkStealingPool
  5. 按正确顺序使用 ThreadPoolExecutor 的 shutdown、awaitTermination、shutdownNow API

相关 SE 问题:

如何在 Java 多线程中使用 CountDownLatch?

如何正确关闭 java ExecutorService


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接