如何等待ThreadPoolExecutor完成任务

37

我的问题:如何在ThreadPoolExecutor上执行一堆线程对象,等待它们全部完成后再继续执行下去?

我是新手,正在通过这段代码学习ThreadPoolExecutor的工作原理。目前,我甚至没有用对象填充BlockingQueue,因为我不知道如何在不使用另一个RunnableObject调用execute()的情况下启动队列。无论如何,现在我只是调用了awaitTermination(),但我认为我还是缺少了什么。有任何提示都将是极好的!谢谢。

public void testThreadPoolExecutor() throws InterruptedException {
  int limit = 20;
  BlockingQueue q = new ArrayBlockingQueue(limit);
  ThreadPoolExecutor ex = new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
  for (int i = 0; i < limit; i++) {
    ex.execute(new RunnableObject(i + 1));
  }
  ex.awaitTermination(2, TimeUnit.SECONDS);
  System.out.println("finished");
}

RunnableObject类:

package playground;

public class RunnableObject implements Runnable {

  private final int id;

  public RunnableObject(int id) {
    this.id = id;
  }

  @Override
  public void run() {
    System.out.println("ID: " + id + " started");
    try {
      Thread.sleep(2354);
    } catch (InterruptedException ignore) {
    }
    System.out.println("ID: " + id + " ended");
  }
}

1
我认为在您发布问题后立即回答并暗示您自己的答案不太合适并不是一个好主意。最好的做法是更新您的原始问题,解释为什么那个答案不足够。 - Kiril
1
@Link,你说得对。我会修复它的。 - kentcdodds
我刚刚编辑了我的答案,展示了一种在关闭执行器之前等待所有作业完成的方法。 - Gray
可能是重复的问题:如何使用ExecutorService等待所有线程完成? - rogerdpack
6个回答

54

你应该在awaitTermination上使用循环。

ExecutorService threads;
// ...
// Tell threads to finish off.
threads.shutdown();
// Wait for everything to finish.
while (!threads.awaitTermination(10, TimeUnit.SECONDS)) {
  log.info("Awaiting completion of threads.");
}

1
完全解决了。我没有注意到awaitTermination会返回任何内容。没有人提到过我需要循环遍历它来阻塞。谢谢! - kentcdodds
11
为什么要用循环呢?为什么不只是增加等待的秒数呢?通常我会使用awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)或类似的方法。 - Gray
1
@Gray - 我最初只在我的代码中调用了一次它,有时会出现非常奇怪的退出问题,或者所有内容都被锁定。最终我追踪到这个问题,所以现在我循环和记录一个“等待”消息,以便我知道发生了什么奇怪的事情。 - OldCurmudgeon
3
我喜欢while循环,因为我有一个执行器来处理文件任务,而它们完成的时间取决于文件数量,是未知的。 - Broken_Window

9

您的问题似乎是在您将所有作业提交到池后没有调用shutdown。如果没有shutdown(),则您的awaitTermination将始终返回false。

ThreadPoolExecutor ex =
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) {
  ex.execute(new RunnableObject(i + 1));
}
// you are missing this line!!
ex.shutdown();
ex.awaitTermination(2, TimeUnit.SECONDS);

您还可以像下面这样做,等待所有作业完成:

您可以采用以下方法等待所有作业完成:

List<Future<Object>> futures = new ArrayList<Future<Object>>();
for (int i = 0; i < limit; i++) {
  futures.add(ex.submit(new RunnableObject(i + 1), (Object)null));
}
for (Future<Object> future : futures) {
   // this joins with the submitted job
   future.get();
}
...
// still need to shutdown at the end
ex.shutdown();

另外,由于您睡眠了2354毫秒,但只等待所有工作终止2SECONDS,因此awaitTermination将始终返回false。我建议使用Long.MAX_VALUE来等待作业完成。
最后,您似乎在担心创建新的ThreadPoolExecutor而希望重用第一个。不必如此。与您编写的任何检测作业是否完成的代码相比,GC开销极小。
引用自javadocs的话:ThreadPoolExecutor.shutdown()
启动有序关闭,在此关闭中执行先前提交的任务,但不会接受新任务。如果已关闭,则调用不会有其他效果。
ThreadPoolExecutor.awaitTermination(...)方法中,它正在等待执行器的状态转换为TERMINATED。但首先状态必须转换为SHUTDOWN(如果调用了shutdown())或STOP(如果调用了shutdownNow())。

3

这与执行程序本身无关。只需使用接口的java.util.concurrent.ExecutorService.invokeAll(Collection<? extends Callable<T>>)方法。它将阻塞,直到所有Callable都执行完成。

Executors旨在长期存在; 超出了一组任务的生命周期。 shutdown方法是用于应用程序结束和清理的。


你能详细解释一下这个答案吗?我喜欢这个答案,但是我在实现它时遇到了困难。谢谢。 - kentcdodds

2

以下是对已接受答案的一种变体,可以处理当抛出InterruptedException时的重试:

executor.shutdown();

boolean isWait = true;

while (isWait)
{
    try
    {             
        isWait = !executor.awaitTermination(10, TimeUnit.SECONDS);
        if (isWait)
        {
            log.info("Awaiting completion of bulk callback threads.");
        }
    } catch (InterruptedException e) {
        log.debug("Interruped while awaiting completion of callback threads - trying again...");
    }
}

1

另一种方法是使用CompletionService,如果您需要尝试任何任务结果,则非常有用:

//run 3 task at time
final int numParallelThreads = 3;

//I used newFixedThreadPool for convenience but if you need you can use ThreadPoolExecutor
ExecutorService executor = Executors.newFixedThreadPool(numParallelThreads);
CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);

int numTaskToStart = 15;

for(int i=0; i<numTaskToStart ; i++){
    //task class that implements Callable<String> (or something you need)
    MyTask mt = new MyTask();

    completionService.submit(mt);
}

executor.shutdown(); //it cannot be queued more task

try {
    for (int t = 0; t < numTaskToStart ; t++) {
        Future<String> f = completionService.take();
        String result = f.get();
        // ... something to do ...
    }
} catch (InterruptedException e) {
    //termination of all started tasks (it returns all not started tasks in queue)
    executor.shutdownNow();
} catch (ExecutionException e) {
    // ... something to catch ...
}

-1

试试这个,

ThreadPoolExecutor ex =
    new ThreadPoolExecutor(limit, limit, 20, TimeUnit.SECONDS, q);
for (int i = 0; i < limit; i++) {
  ex.execute(new RunnableObject(i + 1));
}

需要添加的代码行

ex.shutdown();
ex.awaitTermination(timeout, unit)

你应该检查从等待终止返回的布尔值,并不断尝试直到获得 true。 - Samuel Kerrien

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接