等待一个Future列表

221

我有一个方法,它返回一个 List 的 future。

List<Future<O>> futures = getFutures();

现在我希望等待直到所有的future都成功处理完毕,或者任何一个由future返回输出的任务抛出异常。即使有一个任务抛出异常,也没有必要等待其他futures。

简单的方法是

wait() {

   For(Future f : futures) {
     try {
       f.get();
     } catch(Exception e) {
       //TODO catch specific exception
       // this future threw exception , means somone could not do its task
       return;
     }
   }
}

但问题在于,如果例如第4个future抛出异常,那么我将不必要地等待前3个future变为可用。

该如何解决?计数器 latch 能帮上忙吗?由于Java文档上说无法使用Future的 isDone 方法,所以我不知道该怎么做。

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.

1
谁生成这些 Futures?它们是什么类型的?Java.util.concurrent.Future 接口无法提供您想要的功能,唯一的方法是使用带有回调的自己的 Futures。 - Alexei Kaigorodov
你可以为每个“批次”任务创建一个 ExecutionService 实例,将它们提交到该实例中,然后立即关闭服务并在其上使用 awaitTermination() - millimoose
2
如果您将所有未来任务的主体包装在 try..finally 中以确保计数器也被减少,那么您可以使用 CountDownLatch - millimoose
2
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html 正好满足您的需求。 - assylias
如果我将返回每个单独特性的代码包装在try catch finally中,它将可以工作,但我将无法区分成功完成未来和带有异常的未来。 - user93796
显示剩余4条评论
14个回答

0

可以使用基于Guava的解决方案来实现Futures.FutureCombiner

以下是Javadoc中给出的代码示例:

 final ListenableFuture<Instant> loginDateFuture =
     loginService.findLastLoginDate(username);
 final ListenableFuture<List<String>> recentCommandsFuture =
     recentCommandsService.findRecentCommands(username);
 ListenableFuture<UsageHistory> usageFuture =
     Futures.whenAllSucceed(loginDateFuture, recentCommandsFuture)
         .call(
             () ->
                 new UsageHistory(
                     username,
                     Futures.getDone(loginDateFuture),
                     Futures.getDone(recentCommandsFuture)),
             executor);

如需更多信息,请参阅用户指南中的ListenableFutureExplained部分。

如果您想了解其内部工作原理,建议查看源代码的此部分:AggregateFuture.java#L127-L186


0

这是我用来在一组Future上等待一段时间的方法。我认为这样更简洁。

CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
// Some parallel work
        for (Something tp : somethings) {
            completionService.submit(() -> {
                try {
                   work(something)
                } catch (ConnectException e) {
                } finally {
                    countDownLatch.countDown();
                }
            });
        }    
  try {
        if (!countDownLatch.await(secondsToWait, TimeUnit.SECONDS)){
        }
    } catch (InterruptedException e) {
    }

0
也许这会有所帮助(不使用原始线程来替换任何内容,是的!) 我建议将每个 Future 对象用一个单独的线程运行(它们并行运行),然后每当其中一个出现错误,它就向经理( Handler 类)发信号。
class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
  thisThread=Thread.currentThread();
  List<Future<Object>> futures = getFutures();
  trds=new Thread[futures.size()];
  for (int i = 0; i < trds.length; i++) {
    RunTask rt=new RunTask(futures.get(i), this);
    trds[i]=new Thread(rt);
  }
  synchronized (this) {
    for(Thread tx:trds){
      tx.start();
    }  
  }
  for(Thread tx:trds){
    try {tx.join();
    } catch (InterruptedException e) {
      System.out.println("Job failed!");break;
    }
  }if(!failed){System.out.println("Job Done");}
}

private List<Future<Object>> getFutures() {
  return null;
}

public synchronized void cancelOther(){if(failed){return;}
  failed=true;
  for(Thread tx:trds){
    tx.stop();//Deprecated but works here like a boss
  }thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}

我必须说,上面的代码可能会出错(没有检查),但我希望我能解释清楚解决方案。请尝试一下。


0

对于使用 Vavr 的 Future 的任何人,您可以像这样等待所有的结果:

static <T> Optional<Future<T>> waitForAll(Collection<Future<T>> futures) {
  return futures.stream()
      .reduce((last, next) -> last.flatMap(ignored -> next));

或者,如果在集合中没有未来值,则使用默认值:

static <T> Future<T> waitForAll(Collection<Future<T>> futures, T defaultValue) {
  return futures.stream()
      .reduce(Future.successful(defaultValue), (last, next) -> last.flatMap(ignored -> next));
}

这将等待所有的 futures,无论其中是否有一个失败。


如果想要在任意一个失败时返回,请将累加函数更改为:

(last, next) -> Future.firstCompletedOf(List.of(last, next))
    .flatMap(v -> last.flatMap(ignored -> next));

由于我们在合并功能中只有两个项目,所以可以等待它们中的任何一个完成(Vavr的firstCompletedOf)。 如果失败了,它将忽略flatMap并返回失败的Future。 如果成功(无论哪个成功),它都会进入第一个flatMap,在那里我们也等待另一个完成。

这对于Futures集合的长度如何都适用,因为累加器本质上将它们全部配对:

accumulate(accumulate(accumulate(1, 2), 3), 4)

accumulate 在执行“等待两个,除非其中一个失败”的操作。

警告:这不会停止其他线程的执行。


停止其他线程的执行不就是楼主的问题吗? - Jeremy Fisher
他们只是写了他们不想等待;线程可能会在后台持续运行而没有任何中断。中断线程是一个不同的问题。 - Druckles

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接