ExecutorCompletionService是什么?如果我们有invokeAll为什么还需要它?

49
如果我们使用一个ExecutorCompletionService,我们可以将一系列任务作为Callable提交,并通过CompletionService交互地以queue的形式获取结果。
但是,还有一个ExecutorServiceinvokeAll方法可以接受一组任务并返回一个包含结果的Future列表。
据我所知,使用这两种方式没有任何好处(除了我们使用invokeAll避免了使用for循环将任务提交到CompletionService)而且它们本质上是相同的思想,只有细微的差别。
那么为什么有两种不同的提交一系列任务的方式呢?从性能上讲,它们是否等效?有没有一种情况比另一种更适合使用?我无法想出任何一种。
4个回答

94
使用ExecutorCompletionService.poll/take,您可以按完成顺序(或多或少)接收Future。使用ExecutorService.invokeAll,您没有这个功能;您要么阻塞直到所有任务完成,要么在超时后取消未完成的任务。
static class SleepingCallable implements Callable<String> {

  final String name;
  final long period;

  SleepingCallable(final String name, final long period) {
    this.name = name;
    this.period = period;
  }

  public String call() {
    try {
      Thread.sleep(period);
    } catch (InterruptedException ex) { }
    return name;
  }
}

现在,我将演示invokeAll的工作原理:

final ExecutorService pool = Executors.newFixedThreadPool(2);
final List<? extends Callable<String>> callables = Arrays.asList(
    new SleepingCallable("quick", 500),
    new SleepingCallable("slow", 5000));
try {
  for (final Future<String> future : pool.invokeAll(callables)) {
    System.out.println(future.get());
  }
} catch (ExecutionException | InterruptedException ex) { }
pool.shutdown();

这将产生以下输出:

C:\dev\scrap>java CompletionExample
... after 5 s ...
quick
slow

使用 CompletionService,我们可以看到不同的输出:

final ExecutorService pool = Executors.newFixedThreadPool(2);
final CompletionService<String> service = new ExecutorCompletionService<String>(pool);
final List<? extends Callable<String>> callables = Arrays.asList(
    new SleepingCallable("slow", 5000),
    new SleepingCallable("quick", 500));
for (final Callable<String> callable : callables) {
  service.submit(callable);
}
pool.shutdown();
try {
  Future<String> future;
  do {
    future = pool.isTerminated() ? service.poll() : service.take();
    if (future != null) {
      System.out.println(future.get());
    }
  } while (future != null);
} catch (ExecutionException | InterruptedException ex) { }

这将产生以下输出:

C:\dev\scrap>java CompletionExample
... after 500 ms ...
quick
... after 5 s ...
slow

请注意,这些时间是相对于程序启动而不是上一条消息的。

在Replit上测试一个工作示例


1
那么你的意思是,在从invokeAll返回的List<Future>中开始迭代结果时,我可以在第一个完成之前阻塞,而在ExecutionCompletion中,我会一直阻塞直到有任何一个结果可用?我的理解正确吗? - Cratylus
1
呵。我从来没有在循环条件中放置赋值语句。也许是我的小习惯吧。好答案。 :-) - Gray
“按完成顺序(或多或少)”是什么意思? - Mr_and_Mrs_D
1
@obataku 是的,但是在您的示例中,在我们调用所有未来的get之前,任务可以完成并终止池,因此可能会丢失一些结果。在“future.get()”之后模拟更长时间的处理,例如在您的示例中,“Thread.sleep(4550)”,这样您就永远看不到“slow”。 - Ivan
1
@Ivan 我明白你的意思,谢谢你的指出;现在应该解决了。 - obataku
显示剩余22条评论

20
为什么有两种不同的提交任务方式?它们在性能上是否等效?有没有一种情况比另一种更适合?我想不出来。
通过使用ExecutorCompletionService,您可以在每个作业完成时立即接收通知。相比之下,ExecutorService.invokeAll(...)在返回Future集合之前等待所有作业完成。这意味着(例如),如果除一个作业外,所有作业都在10分钟内完成,但1个作业需要30分钟,您将在30分钟后得到结果。
// this waits until _all_ of the jobs complete
List<Future<Object>> futures = threadPool.invokeAll(...);

相反,当您使用ExecutorCompletionService时,您将能够在每个作业完成后立即获取它们,从而使您能够(例如)将它们发送到另一个线程池进行处理、立即记录结果等。

ExecutorService threadPool = Executors.newFixedThreadPool(2);
ExecutorCompletionService<Result> compService
      = new ExecutorCompletionService<Result>(threadPool);
for (MyJob job : jobs) {
    compService.submit(job);
}
// shutdown the pool but the jobs submitted continue to run
threadPool.shutdown();
while (true) {
    Future<Result> future;
    // if pool has terminated (all jobs finished after shutdown) then poll() else take()
    if (threadPool.isTerminated()) {
        future = compService.poll();
        if (future == null) {
            break;
        }
    } else {
        // the take() blocks until any of the jobs complete
        // this joins with the jobs in the order they _finish_
        future = compService.take();
    }
    // this get() won't block
    Result result = future.get();
    // you can then put the result in some other thread pool or something
    // to immediately start processing it
    someOtherThreadPool.submit(new SomeNewJob(result));
}

while(!threadPool.isTerminated()) 这不就是一个繁忙的正式等待吗? - Coder
这只是 take() 块,所以它不会旋转。我回答了你的问题,@Sergio? - Gray
是的,谢谢!我正在研究如何限制Executors.newFixedThreadPool内部的阻塞队列。特别是我正在使用ListenableFuture - Coder
@Gray,我不理解你对while(!threadPool.isTerminated())的解释。为什么需要它?它有什么作用? - tinkuge
isTerminate() 如果线程池已关闭并且所有作业已完成,则返回 true。@tinkuge,这是您要问的吗? - Gray
@Gray,修改后更加清晰易懂了,谢谢! - tinkuge

3

我实际上从未使用过ExecutorCompletionService,但我认为这种情况比“正常”的ExecutorService更有用的情况是当你想按完成顺序接收已完成任务的Futures时。使用invokeAll时,您只会得到一个列表,其中可以包含任何给定时间的不完整和已完成任务的混合。


1

只考虑结果顺序进行比较:

当我们使用CompletionService时,每当提交的作业完成时,结果将被推送到队列中(完成顺序)。然后,提交的作业和返回的结果的顺序不再相同。因此,如果您关心任务执行的顺序,请使用CompletionService

invokeAll返回表示已完成任务的Future列表,与给定任务列表的迭代器产生的顺序相同。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接