ExecutorService,如何等待所有任务完成

243

等待 ExecutorService 的所有任务完成的最简单方法是什么?我的任务主要是计算型的,所以我只想在每个核心上运行大量的作业。目前我的设置如下:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask 实现了可运行接口。代码似乎可以正确执行任务,但是在 wait() 上会崩溃,并出现 IllegalMonitorStateException。这很奇怪,因为我玩弄一些玩具例子,它似乎可以工作。

uniquePhrases 包含几万个元素。我应该使用另一种方法吗?我正在寻找尽可能简单的解决方案。

16个回答

260
最简单的方法是使用ExecutorService.invokeAll(),这个方法可以在一行代码中实现你想要的功能。在你的术语中,你需要修改或包装ComputeDTask来实现Callable<>,这样可以给你更多的灵活性。可能在你的应用程序中有一个有意义的Callable.call()实现,但如果没有,下面是一种包装的方法,可以使用Executors.callable()
ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable: uniquePhrases) { 
    todo.add(Executors.callable(new ComputeDTask(singleTable))); 
}

List<Future<Object>> answers = es.invokeAll(todo);

正如其他人指出的那样,如果适当的话,你可以使用invokeAll()的超时版本。在这个例子中,answers将包含一堆将返回null的Future(请参见Executors.callable()的定义)。可能你想要做的是稍微重构一下,这样你就可以得到一个有用的答案,或者一个对底层ComputeDTask的引用,但从你的例子中我无法判断。
如果不清楚的话,请注意invokeAll()在所有任务完成之前不会返回。(即,如果被问及,你的answers集合中的所有Future都将报告.isDone()。)这避免了所有手动关闭、awaitTermination等操作,并允许你很好地为多个周期重复使用这个ExecutorService
SO上有一些相关的问题: 这些问题都不是针对你的问题,但它们确实提供了一些关于人们如何认为应该使用Executor/ExecutorService的信息。

11
如果您正在批量添加所有工作并保留Callable列表,则此方法非常完美,但如果您在回调或事件循环情况下调用ExecutorService.submit(),则此方法将无法正常工作。 - Desty
4
值得一提的是,当不再需要ExecutorService时仍应调用shutdown(),否则线程将永远不会终止(除非corePoolSize=0或allowCoreThreadTimeOut=true的情况)。 - John29
1
太棒了!正是我在寻找的。非常感谢分享答案。让我试一下。 - MohamedSanaulla
1
@Desty 在这种情况下,最好的实现方式是什么? - TommyQu

66

36
shutdown() 方法停止 ExecutorService 接收新任务并关闭空闲的工作线程。它没有指定等待关闭完成,ThreadPoolExecutor 中的实现也不会等待。 - Alain O'Dea
1
@Alain - 谢谢。我应该提到awaitTermination。已修复。 - NG.
5
如果一个任务需要安排进一步的任务才能完成,该怎么办?例如,您可以创建一个多线程树遍历程序,将分支交给工作线程处理。在这种情况下,由于ExecutorService会立即关闭,因此它无法接受任何递归调度的任务。 - Rag
2
"awaitTermination"需要作为参数的超时时间。虽然可以提供有限的时间并在其周围放置循环以等待所有线程完成,但我想知道是否有更优雅的解决方案。 - Abs
2
你说得对,但是看看这个答案 - https://dev59.com/PnM_5IYBdhLWcg3wvF3J#1250655 - 你总可以给它一个非常长的超时时间。 - NG.
显示剩余3条评论

52
如果你的目标不是等待ExecutorService中的所有任务完成,而是等待特定批次的任务完成,你可以使用CompletionService——具体来说是ExecutorCompletionService
创建一个包装你的ExecutorExecutorCompletionService,通过CompletionService submit 一些已知数量的任务,然后使用take()(阻塞)或poll()(非阻塞)从完成队列中提取相同数量的结果。一旦你获取到了提交的所有任务对应的预期结果,你就知道它们都完成了。
让我再说一遍,因为从界面上看不出来:你必须知道放入CompletionService中的东西数量,才能知道要尝试取出多少个东西。这在使用take()方法时尤其重要:如果调用次数太多,它将阻塞您的调用线程,直到其他线程向同一个CompletionService提交另一个作业。
书中有一些示例展示如何使用CompletionService,该书名为《Java并发编程实战》

这是对我的回答的很好的补充--我会说回答问题的直接方法是invokeAll();但是@seh在提交作业组到ES并等待它们完成时是正确的... --JA - andersoj
@om-nom-nom,感谢您更新链接。我很高兴看到答案仍然有用。 - seh
1
好的回答,我之前不知道CompletionService - Vic
1
这是一种方法,如果您不想关闭现有的ExecutorService,但只想提交一批任务,并知道它们何时全部完成。 - ToolmakerSteve

13

如果你想等待执行器服务完成执行,调用shutdown(),然后调用awaitTermination(units, unitType),例如awaitTermination(1, MINUTE)。ExecutorService不能在自己的监视器上阻塞,因此不能使用wait


我认为应该是awaitTermination。 - NG.
@SB - 谢谢 - 我看到我的记忆有误了!我已经更新了名称并添加了链接以确保正确。 - mdma
要“永久等待”,请像这样使用awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);。https://dev59.com/PnM_5IYBdhLWcg3wvF3J#1250655 - rogerdpack
我认为这是最简单的方法。 - Shervin Asgari
1
@MosheElisha,你确定吗?https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#shutdown-- 上说“启动有序关闭,在此过程中将执行先前提交的任务,但不会接受新任务。” - Jaime Hablutzel
@JaimeHablutzel 没错,你说得对。我也刚测试了一下,我认为我之前的评论是错误的。我会删除它,以免让人们感到困惑。 - MosheElisha

9
您可以在特定的时间间隔内等待作业完成:
int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);    
}

您可以使用ExecutorService.submit(Runnable),并收集其返回的Future对象,依次调用get()等待它们完成。

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}

InterruptedException 非常重要,需要妥善处理。它可以让您或您的库的用户安全地终止长时间运行的进程。


8

只需使用

latch = new CountDownLatch(noThreads)

每个线程中

latch.countDown();

并作为障碍

latch.await();

在等待时不要忘记捕获InterruptedException异常。 - Marcell

8

有几种方法。

你可以先调用 ExecutorService.shutdown 然后再调用 ExecutorService.awaitTermination,它会返回:

true 如果此执行程序终止并且 false 如果在终止之前超时

所以:

有一个名为 awaitTermination 的函数。但是它需要提供一个超时时间。这并不能保证当它返回时所有任务都已经完成了。有没有一种方法可以实现这一点?

你只需要在循环中调用 awaitTermination

使用 awaitTermination

这是一个完整的示例实现:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100));
        }

        int count = 0;

        // This is the relevant part
        // Chose the delay most appropriate for your use case
        executor.shutdown();
        while (!executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
            System.out.println("Waiting "+ count);
            count++;
        }
    }

    private static Runnable parallelWork(long sleepMillis) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
        };
    }
}

使用 CountDownLatch:

另一个选项是创建一个 CountDownLatch,其 count 等于并行任务的数量。每个线程调用 countDownLatch.countDown();,而线程调用 countDownLatch.await();

此实现的完整示例:

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException {
        final int total_threads = 4;
        CountDownLatch countDownLatch = new CountDownLatch(total_threads);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, countDownLatch));
        }
        countDownLatch.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CountDownLatch countDownLatch) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            countDownLatch.countDown();
        };
    }
}

使用 CyclicBarrier:

另一种方法是使用 循环屏障(Cyclic Barrier)

public class WaitForAllToEnd {

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        final int total_threads = 4;
        CyclicBarrier barrier = new CyclicBarrier(total_threads+ 1);
        ExecutorService executor = Executors.newFixedThreadPool(total_threads);
        for(int i = 0; i < total_threads; i++){
            executor.execute(parallelWork(100 + i * 100, barrier));
        }
        barrier.await();
        System.out.println("Exit");
        executor.shutdown();
    }

    private static Runnable parallelWork(long sleepMillis, CyclicBarrier barrier) {
        return () -> {
            try {
                Thread.sleep(sleepMillis);
            } catch (InterruptedException e) {
                // Do Something
            }
            System.out.println("I am Thread : " + Thread.currentThread().getId());
            try {
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
              // Do something
            }
        };
    }
}

还有其他方法,但这些方法需要更改您的初始要求,即:

如何在使用 ExecutorService.execute() 提交任务时等待所有任务完成。


7
您可以使用ExecutorService.invokeAll方法,它将执行所有任务并等待所有线程完成其任务。
这里是完整的javadoc文档
您还可以使用重载版本的此方法来指定超时时间。
以下是带有ExecutorService.invokeAll的示例代码。
public class Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newFixedThreadPool(3);
        List<Callable<String>> taskList = new ArrayList<>();
        taskList.add(new Task1());
        taskList.add(new Task2());
        List<Future<String>> results = service.invokeAll(taskList);
        for (Future<String> f : results) {
            System.out.println(f.get());
        }
    }

}

class Task1 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(2000);
            return "Task 1 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task1";
        }
    }
}

class Task2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(3000);
            return "Task 2 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task2";
        }
    }
}

7

IllegalMonitorStateException的根本原因:

抛出此异常表示线程试图等待对象的监视器或通知等待对象的其他线程而不拥有指定的监视器。

从您的代码中,您刚刚在没有拥有锁的情况下调用了ExecutorService上的wait()方法。

以下代码将修复IllegalMonitorStateException

try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 

请按照以下任意一种方式等待所有已提交至 ExecutorService 的任务完成:

  1. Iterate through all Future tasks from submit on ExecutorService and check the status with blocking call get() on Future object

  2. Using invokeAll on ExecutorService

  3. Using CountDownLatch

  4. Using ForkJoinPool or newWorkStealingPool of Executors(since java 8)

  5. Shutdown the pool as recommended in oracle documentation page

    void shutdownAndAwaitTermination(ExecutorService pool) {
       pool.shutdown(); // Disable new tasks from being submitted
       try {
       // Wait a while for existing tasks to terminate
       if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
           pool.shutdownNow(); // Cancel currently executing tasks
           // Wait a while for tasks to respond to being cancelled
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
       }
    } catch (InterruptedException ie) {
         // (Re-)Cancel if current thread also interrupted
         pool.shutdownNow();
         // Preserve interrupt status
         Thread.currentThread().interrupt();
    }
    

    If you want to gracefully wait for all tasks for completion when you are using option 5 instead of options 1 to 4, change

    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
    

    to

    a while(condition) which checks for every 1 minute.


3

我也遇到了一组需要爬取的文档。我从一个初始的“种子”文件开始处理,该文件包含指向其他需要处理的文件的链接,以此类推。

在我的主程序中,我只想编写以下内容,其中 Crawler 控制着一堆线程。

Crawler c = new Crawler();
c.schedule(seedDocument); 
c.waitUntilCompletion()

如果我想浏览一棵树,也会出现同样的情况;我会放入根节点,每个节点的处理器将根据需要向队列添加子节点,并且一堆线程将处理树中的所有节点,直到没有更多节点。
我在JVM中找不到任何令我感到惊讶的东西。因此,我编写了一个名为ThreadPool的类,可以直接使用或子类化以添加适合该领域的方法,例如schedule(Document)。希望这会有所帮助! ThreadPool Javadoc | Maven

文档链接已失效。 - MDT

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接