在Java 7中,invokeAll()是一个阻塞调用吗?

9
ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<Callable<String>>();

callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 2";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});

List<Future<String>> futures = executorService.invokeAll(callables);

for(Future<String> future : futures){
    System.out.println("future.get = " + future.get());
}

对于这段代码,我的问题是“invokeAll()是一个阻塞调用吗”? 也就是说,当代码运行到invokeAll()这一行时,我们是否会在那里阻塞等待所有结果生成?


不可以。future.get() 会阻塞程序。 - Petr Janeček
5
实际上,我认为invokeAll是一个阻塞方法,直到列表中的所有任务都完成、失败或取消后才返回结果。引用的话是:“执行给定的任务,并在所有任务完成时返回Future列表,其中包含它们的状态和结果”——虽然我承认这句话有歧义,但我过去曾出于这个特定原因使用过它,即“运行这一堆东西,然后在你完成时让我知道”。 - MadProgrammer
幸运的是,我们可以很容易地尝试它。 - Petr Janeček
2
看起来 invokeAll 是阻塞的。我尝试了不同的执行服务,并且可以看到我的任务总是在 invokeAll 完成之前完成。这对我来说看起来是错误的。 - Stan Sokolov
3个回答

13
执行给定任务,并返回Future列表,持有它们的状态和结果,当所有任务都完成时。注意,返回列表中的每个元素都为true。如果给定的集合在此操作正在进行时被修改,则此方法的结果是未定义的。请注意,已完成的任务可能已正常终止或通过抛出异常而终止。只有当任务已经执行完毕时,才能完成Futures。
因此,该方法只有在任务执行完毕时才能返回。该方法可以抛出InterruptedException,这也表明它是一个阻塞操作。查看java.util.concurrent.AbstractExecutorService中invokeAll的实现(内联评论)。
// from OpenJDK source; GPL-2.0-with-classpath-exception
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
        for (int i = 0, size = futures.size(); i < size; i++) {
            Future<T> f = futures.get(i);
            if (!f.isDone()) {
                try {
                    f.get(); // <== *** BLOCKS HERE ***

                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
    }
}

实际上,在这种情况下,当Javadoc-Specese似乎难以理解时,您通常应该查看参考实现。 请注意,某些实现细节不是规范的一部分。


2
您的意思是,父线程将等待使用 ExecutorService 调用创建的所有线程吗?那么答案是肯定的,父线程将会等待,一旦所有线程完成,您将获得 Futures 对象列表,该列表将保存每个线程执行的结果。
请参见下面的内容来自 ExecutorService.invokeAll()

执行给定的任务,返回一个Future列表,其中包含它们的状态和结果当全部完成时


2
这可能会曲解措辞,但是这也可以被解释为所有的 futures 只有在所有任务都完成后才更新它们的结果状态,即使 futures 已经立即返回了。 - the8472
从用户角度来看,文档中说“Future”列表将在所有任务完成后返回。从JVM内部的角度来看,是的,每个任务都会返回其结果,“Future”列表将不断构建,并且只有在所有任务完成后才会返回最终列表。 - hagrawal7777
1
我是指从用户角度来看,如果您错误地阅读了文档: “执行给定的任务,返回一个Future列表;当所有任务完成时,Future将持有它们的状态和结果。”。也就是说,所有Future的内部状态都会原子更新,但列表会立即返回。至少这就是我认为OP所困惑的地方。 - the8472
@the8472 抱歉,我无法理解您的观点,请您详细说明一下? - hagrawal7777
@the8472 解释与读者紧密耦合。 OP 想知道的和文档所说的事实是,父线程将等待所有任务完成(成功完成或由于异常而中止)。 - hagrawal7777
显示剩余2条评论

0

InvokeAll方法会阻塞,直到所有任务完成并返回Future对象列表, 解决方案: 如果我们不想这样做并继续执行程序,则可以循环遍历任务并将其传递给ExecutorService的Submit方法,并将其添加到Future对象列表中。

    ExecutorService es=Executors.newFixedThreadPool(4);
    List<SampleClassimplementingCallable<String>> tasks=new ArrayList<>();
    List<Future<String>> futures=new  ArrayList<>();
    for(SampleClassimplementingCallable<String> s:tasks)
    {   
        //This Won't Block the Calling Thread and We will get the list of futures
        futures.add(es.submit(s));
    }
    
    
    However, When the Futures are retrieved from the list and get method is called on indivual future object ,then the thread is blocked.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接