等待一个Future列表

221

我有一个方法,它返回一个 List 的 future。

List<Future<O>> futures = getFutures();

现在我希望等待直到所有的future都成功处理完毕,或者任何一个由future返回输出的任务抛出异常。即使有一个任务抛出异常,也没有必要等待其他futures。

简单的方法是

wait() {

   For(Future f : futures) {
     try {
       f.get();
     } catch(Exception e) {
       //TODO catch specific exception
       // this future threw exception , means somone could not do its task
       return;
     }
   }
}

但问题在于,如果例如第4个future抛出异常,那么我将不必要地等待前3个future变为可用。

该如何解决?计数器 latch 能帮上忙吗?由于Java文档上说无法使用Future的 isDone 方法,所以我不知道该怎么做。

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.

1
谁生成这些 Futures?它们是什么类型的?Java.util.concurrent.Future 接口无法提供您想要的功能,唯一的方法是使用带有回调的自己的 Futures。 - Alexei Kaigorodov
你可以为每个“批次”任务创建一个 ExecutionService 实例,将它们提交到该实例中,然后立即关闭服务并在其上使用 awaitTermination() - millimoose
2
如果您将所有未来任务的主体包装在 try..finally 中以确保计数器也被减少,那么您可以使用 CountDownLatch - millimoose
2
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html 正好满足您的需求。 - assylias
如果我将返回每个单独特性的代码包装在try catch finally中,它将可以工作,但我将无法区分成功完成未来和带有异常的未来。 - user93796
显示剩余4条评论
14个回答

167

你可以使用CompletionService在它们准备就绪时接收future,并且如果其中一个抛出异常,取消处理。像这样:

Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService = 
       new ExecutorCompletionService<SomeResult>(executor);

//4 tasks
for(int i = 0; i < 4; i++) {
   completionService.submit(new Callable<SomeResult>() {
       public SomeResult call() {
           ...
           return result;
       }
   });
}

int received = 0;
boolean errors = false;

while(received < 4 && !errors) {
      Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
      try {
         SomeResult result = resultFuture.get();
         received ++;
         ... // do something with the result
      }
      catch(Exception e) {
             //log
         errors = true;
      }
}

我认为你可以进一步改进,如果其中一个任务抛出错误,就取消任何仍在执行的任务。


1
你的代码存在和我在帖子中提到的相同问题。如果第四个future抛出异常,那么代码仍然会等待future 1、2、3完成,还是completionService.take()会返回最先完成的future? - user93796
1
超时怎么办?我能告诉完成服务最多等待X秒吗? - user93796
1
不应该有。它不会迭代未来,但是一旦有一个准备好了,它就会被处理/验证,如果没有抛出异常。 - dcernahoschi
2
为了超时等待未来出现在队列中,CompletionService 上有一个 poll(seconds) 方法。 - dcernahoschi
这里是在Github上的可工作示例:https://github.com/princegoyal1987/FutureDemo - user18853
显示剩余2条评论

159

如果你正在使用Java 8,那么你可以使用CompletableFuture和CompletableFuture.allOf更轻松地完成此操作,该方法仅在所有提供的CompletableFuture完成后应用回调。

// Waits for *all* futures to complete and returns a list of results.
// If *any* future completes exceptionally then the resulting future will also complete exceptionally.

public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
    CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);

    return CompletableFuture.allOf(cfs)
            .thenApply(ignored -> futures.stream()
                                    .map(CompletableFuture::join)
                                    .collect(Collectors.toList())
            );
}

3
你好@Andrejs,你能解释一下这段代码的作用吗?我在多个地方看到这个建议,但是不太明白实际上发生了什么。如果其中一个线程失败了,异常如何处理? - VSEWHGHP
2
从javadoc中:如果任何给定的CompletableFuture异常完成,则返回的CompletableFuture也会这样做,并使用CompletionException将此异常作为其原因。 - Andrejs
2
我在跟进这个问题,有没有办法使用这个代码片段获取所有已成功完成的其他线程的值?我应该只是迭代CompletableFutures列表并调用get,忽略CompletableFuture<List<T>>,因为sequence函数会确保所有线程都完成,无论是结果还是异常。 - VSEWHGHP
27
这解决的是不同的问题。如果你有“Future”实例,就不能使用这种方法。将“Future”转换为“CompletableFuture”并不容易,需要经过一定的处理。 - Jarekczek
7
为什么他们不能将所有这些逻辑放入CompletableFuture.allOf,并使其返回T而不是void?为什么我必须将它复制并粘贴到数十个项目中?Scala有Future.sequence,为什么Java没有?:'( - Cherry
显示剩余4条评论

87

在Java 8中使用CompletableFuture

    // Kick of multiple, asynchronous lookups
    CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
    CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
    CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");

    // Wait until they are all done
    CompletableFuture.allOf(page1,page2,page3).join();

    logger.info("--> " + page1.get());

8
这应该是被接受的答案。此外,它是Spring官方文档的一部分: https://spring.io/guides/gs/async-method/ - maaw
1
你如何取消任何超过2秒的未来任务?使用.get会像这样“堆积”:https://dev59.com/pGQm5IYBdhLWcg3wqwfn - Jason
1
@maaw的问题没有具体指定"spring",并且答案也没有展示如何构建这样的CompletableFuture。Executor接口在submit方法中指定了一个常规的Future<T>。 - Dragas

19

你可以使用ExecutorCompletionService。文档甚至有一个适用于您确切使用情况的示例:

假设您希望使用任务集的第一个非空结果,忽略任何遇到异常的结果,并在第一个任务准备好时取消所有其他任务:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    int n = solvers.size();
    List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
    Result result = null;
    try {
        for (Callable<Result> s : solvers)
            futures.add(ecs.submit(s));
        for (int i = 0; i < n; ++i) {
            try {
                Result r = ecs.take().get();
                if (r != null) {
                    result = r;
                    break;
                }
            } catch (ExecutionException ignore) {
            }
        }
    } finally {
        for (Future<Result> f : futures)
            f.cancel(true);
    }

    if (result != null)
        use(result);
}

需要注意的重要一点是,ecs.take()将获得第一个完成的任务,而不仅仅是第一个提交的任务。因此,您应该按照执行完成的顺序(或抛出异常的顺序)获取它们。


3
如果您想要合并一个CompletableFuture列表,可以这样做:
List<CompletableFuture<Void>> futures = new ArrayList<>();
// ... Add futures to this ArrayList of CompletableFutures

// CompletableFuture.allOf() method demand a variadic arguments
// You can use this syntax to pass a List instead
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[futures.size()]));

// Wait for all individual CompletableFuture to complete
// All individual CompletableFutures are executed in parallel
allFutures.get();

关于Future和CompletableFuture的更多细节,有用的链接如下:
1. Future: https://www.baeldung.com/java-future
2. CompletableFuture: https://www.baeldung.com/java-completablefuture
3. CompletableFuture: https://www.callicoder.com/java-8-completablefuture-tutorial/


3
如果你正在使用Java 8,并且不想操作CompletableFuture,我编写了一个工具来使用流式处理检索List<Future<T>>的结果。关键在于禁止使用map(Future::get),因为它会抛出异常。
public final class Futures
{

    private Futures()
    {}

    public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
    {
        return new FutureCollector<>();
    }

    private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
    {
        private final List<Throwable> exceptions = new LinkedList<>();

        @Override
        public Supplier<Collection<T>> supplier()
        {
            return LinkedList::new;
        }

        @Override
        public BiConsumer<Collection<T>, Future<T>> accumulator()
        {
            return (r, f) -> {
                try
                {
                    r.add(f.get());
                }
                catch (InterruptedException e)
                {}
                catch (ExecutionException e)
                {
                    exceptions.add(e.getCause());
                }
            };
        }

        @Override
        public BinaryOperator<Collection<T>> combiner()
        {
            return (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            };
        }

        @Override
        public Function<Collection<T>, List<T>> finisher()
        {
            return l -> {

                List<T> ret = new ArrayList<>(l);
                if (!exceptions.isEmpty())
                    throw new AggregateException(exceptions, ret);

                return ret;
            };

        }

        @Override
        public Set<java.util.stream.Collector.Characteristics> characteristics()
        {
            return java.util.Collections.emptySet();
        }
    }

这需要一个类似于C#的AggregateException的异常处理机制。

public class AggregateException extends RuntimeException
{
    /**
     *
     */
    private static final long serialVersionUID = -4477649337710077094L;

    private final List<Throwable> causes;
    private List<?> successfulElements;

    public AggregateException(List<Throwable> causes, List<?> l)
    {
        this.causes = causes;
        successfulElements = l;
    }

    public AggregateException(List<Throwable> causes)
    {
        this.causes = causes;
    }

    @Override
    public synchronized Throwable getCause()
    {
        return this;
    }

    public List<Throwable> getCauses()
    {
        return causes;
    }

    public List<?> getSuccessfulElements()
    {
        return successfulElements;
    }

    public void setSuccessfulElements(List<?> successfulElements)
    {
        this.successfulElements = successfulElements;
    }

}

这个组件的作用类似于C#的Task.WaitAll。我正在开发一个变体,与CompletableFuture.allOf(相当于Task.WhenAll)相同。
我这样做的原因是我正在使用Spring的ListenableFuture,尽管它是一种更标准的方式,但我不想转换为CompletableFuture

1
看到需要一个等效的 AggregateException,点赞。 - granadaCoder
一个使用此功能的示例会很好。 - XDS

1
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class Stack2 {   
    public static void waitFor(List<Future<?>> futures) {
        List<Future<?>> futureCopies = new ArrayList<Future<?>>(futures);//contains features for which status has not been completed
        while (!futureCopies.isEmpty()) {//worst case :all task worked without exception, then this method should wait for all tasks
            Iterator<Future<?>> futureCopiesIterator = futureCopies.iterator();
            while (futureCopiesIterator.hasNext()) {
                Future<?> future = futureCopiesIterator.next();
                if (future.isDone()) {//already done
                    futureCopiesIterator.remove();
                    try {
                        future.get();// no longer waiting
                    } catch (InterruptedException e) {
                        //ignore
                        //only happen when current Thread interrupted
                    } catch (ExecutionException e) {
                        Throwable throwable = e.getCause();// real cause of exception
                        futureCopies.forEach(f -> f.cancel(true));//cancel other tasks that not completed
                        return;
                    }
                }
            }
        }
    }
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);

        Runnable runnable1 = new Runnable (){
            public void run(){
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                }
            }
        };
        Runnable runnable2 = new Runnable (){
            public void run(){
                try {
                    Thread.sleep(4000);
                } catch (InterruptedException e) {
                }
            }
        };


        Runnable fail = new Runnable (){
            public void run(){
                try {
                    Thread.sleep(1000);
                    throw new RuntimeException("bla bla bla");
                } catch (InterruptedException e) {
                }
            }
        };

        List<Future<?>> futures = Stream.of(runnable1,fail,runnable2)
                .map(executorService::submit)
                .collect(Collectors.toList());

        double start = System.nanoTime();
        waitFor(futures);
        double end = (System.nanoTime()-start)/1e9;
        System.out.println(end +" seconds");

    }
}

1
我有一个实用类,其中包含以下内容:
@FunctionalInterface
public interface CheckedSupplier<X> {
  X get() throws Throwable;
}

public static <X> Supplier<X> uncheckedSupplier(final CheckedSupplier<X> supplier) {
    return () -> {
        try {
            return supplier.get();
        } catch (final Throwable checkedException) {
            throw new IllegalStateException(checkedException);
        }
    };
}

一旦你拥有了它,使用静态导入,你可以像这样简单地等待所有的 futures:
futures.stream().forEach(future -> uncheckedSupplier(future::get).get());

你也可以像这样收集他们的所有结果:

List<MyResultType> results = futures.stream()
    .map(future -> uncheckedSupplier(future::get).get())
    .collect(Collectors.toList());

我刚刚回顾了我的旧帖子,发现你还有另一个疑问:

但问题在于,如果第四个future抛出异常,那么我将不必要地等待前三个future可用。

在这种情况下,简单的解决方案是并行执行:

futures.stream().parallel()
 .forEach(future -> uncheckedSupplier(future::get).get());

这样做的话,第一个异常虽然不会影响后续操作,但会中断forEach语句,就像串行示例一样。但由于所有操作是并行进行的,所以您无需等待前3个操作完成。

0
 /**
     * execute suppliers as future tasks then wait / join for getting results
     * @param functors a supplier(s) to execute
     * @return a list of results
     */
    private List getResultsInFuture(Supplier<?>... functors) {
        CompletableFuture[] futures = stream(functors)
                .map(CompletableFuture::supplyAsync)
                .collect(Collectors.toList())
                .toArray(new CompletableFuture[functors.length]);
        CompletableFuture.allOf(futures).join();
        return stream(futures).map(a-> {
            try {
                return a.get();
            } catch (InterruptedException | ExecutionException e) {
                //logger.error("an error occurred during runtime execution a function",e);
                return null;
            }
        }).collect(Collectors.toList());
    };

0

CompletionService 可以使用 .submit() 方法来获取您的 Callables,并使用 .take() 方法检索计算出的 futures。

有一件事情您不能忘记,那就是通过调用 .shutdown() 方法来终止 ExecutorService。同时,只有在保存了对执行器服务的引用时才能调用此方法,因此请确保保留一个引用。

示例代码 - 对于要并行处理的固定数量的工作项:

ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

CompletionService<YourCallableImplementor> completionService = 
new ExecutorCompletionService<YourCallableImplementor>(service);

ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>();

for (String computeMe : elementsToCompute) {
    futures.add(completionService.submit(new YourCallableImplementor(computeMe)));
}
//now retrieve the futures after computation (auto wait for it)
int received = 0;

while(received < elementsToCompute.size()) {
 Future<YourCallableImplementor> resultFuture = completionService.take(); 
 YourCallableImplementor result = resultFuture.get();
 received ++;
}
//important: shutdown your ExecutorService
service.shutdown();

示例代码 - 动态数量的工作项并行处理:

public void runIt(){
    ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service);
    ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>();

    //Initial workload is 8 threads
    for (int i = 0; i < 9; i++) {
        futures.add(completionService.submit(write.new CallableImplementor()));             
    }
    boolean finished = false;
    while (!finished) {
        try {
            Future<CallableImplementor> resultFuture;
            resultFuture = completionService.take();
            CallableImplementor result = resultFuture.get();
            finished = doSomethingWith(result.getResult());
            result.setResult(null);
            result = null;
            resultFuture = null;
            //After work package has been finished create new work package and add it to futures
            futures.add(completionService.submit(write.new CallableImplementor()));
        } catch (InterruptedException | ExecutionException e) {
            //handle interrupted and assert correct thread / work packet count              
        } 
    }

    //important: shutdown your ExecutorService
    service.shutdown();
}

public class CallableImplementor implements Callable{
    boolean result;

    @Override
    public CallableImplementor call() throws Exception {
        //business logic goes here
        return this;
    }

    public boolean getResult() {
        return result;
    }

    public void setResult(boolean result) {
        this.result = result;
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接