如何等待使用不同`ExecutorServices`创建的`Future`列表

12

好的,我知道第一个回答 / 评论会是“使用一个ExecutorServiceinvokeAll”。但是,我们将线程池分开保留有很好的理由(不详述)。

所以我有一个线程池列表(ExecutorServices),我需要做的是使用submit在每个线程池上调用不同的Callable(没有问题)。现在,我有一系列的Future实例,每个都在不同的ExecutorService上创建,并且我想等待它们全部完成(并能够提供超时,在此之后未完成的任务会被取消)。

是否存在一个现有的类来实现这个功能(封装一个Future实例列表并允许等待所有任务完成)?如果不存在,对于高效的机制的建议将不胜感激。

考虑为每个请求调用带有超时的get,但必须计算每次调用的总时间。

我看到了这篇文章Wait Until Any of Future is Done,但这扩展了Future而不是封装了它们的列表。


对于每个列表,当最后一个完成时,它就完成了 - 就像他们所说的那样。 - Mitch Connor
如果你仔细阅读那些漂亮的Java文档,你会发现你可以等待终止并设置超时时间,然后当你提交任务时,你可以发送一个关闭命令,如果任务没有在规定时间内完成,它将在超时后关闭。嗖嗖!!! - Mitch Connor
他的问题中哪里说了执行器将被关闭? - John Vint
如果这些是Guava的“ListeningExecutorService”,那么你可以这样做... - Louis Wasserman
8
如果你能在每个单独的服务中使用MoreExecutors.listeningDecorator(service),那么你可以将它们全部转变为ListeningExecutorService。然后,你可以使用Futures.allAsList创建一个“合并”的未来。 - Louis Wasserman
显示剩余4条评论
4个回答

17

根据Louis的评论,我需要的是Futures.successfulAsList

这使我能够等待所有任务完成,然后检查是否有任何失败的任务。

Guava很棒!


2

我认为JDK并没有提供直接API来实现这个功能。不过,我认为创建一个简单的方法同样很容易就能做到这点。您可能需要查看AbstractExecutorService.invokeAll()的实现来了解如何实现。

基本上,您需要调用每个future的future.get(),并随着等待结果所需的时间逐渐减少等待时间,在返回方法之前取消所有未完成的futures。


+1同意。我已经考虑到这一点并在我的帖子中包含了这个显而易见的机制。我希望有更好/已实现的机制,希望其他人知道。 - John B
这可能是你能得到的最好的实现。JDK 的创建者们本来可以提供它作为一个独立的(静态)方法,这很简单。另一种实现可能涉及使用完成服务并按任务完成的顺序等待。然而,代码变得有点复杂,但额外的好处很少。 - sjlee

1
也许我没有真正理解。然而,对我来说,它仍然听起来很简单。
public <V> List<V> get(List<Future<V>> futures, long timeout, TimeUnit unit)
          throws InterruptedException, ExecutionException, TimeoutException {
    List<V> result = new ArrayList<V>();
    long end = System.nanoTime() + unit.toNanos(timeout);
    for (Future<V> f: futures) {
        result.add(f.get(end - System.nanoTime(), TimeUnit.NANOSECONDS));
    }
    return result;
}

我这样想错了吗?

我认为你所链接的问题要复杂得多,因为他们只想等待最快的,当然不知道哪个是最快的。


问题在于 get 可能会抛出异常。将其更改为在所有 Futures 完成后返回将有所帮助。此外,它还没有处理关闭操作。这是显而易见的实现方式,我想知道是否有更简洁/已经测试过的实现方式。 - John B

1

这段代码需要进行一些清理,但它应该能解决你的问题。(为了节省时间和空间,有些封装被省略了):

public static <T> LatchWithWrappedCallables<T> wrapCallables(Collection<Callable<T>> callablesToWrap)
{
    CountDownLatch latch = new CountDownLatch(callablesToWrap.size());
    List<Callable<T>> wrapped = new ArrayList<Callable<T>>(callablesToWrap.size());
    for (Callable<T> currCallable : callablesToWrap)
    {
        wrapped.add(new CallableCountdownWrapper<T>(currCallable, latch));
    }

    LatchWithWrappedCallables<T> returnVal = new LatchWithWrappedCallables<T>();
    returnVal.latch = latch;
    returnVal.wrappedCallables = wrapped;
    return returnVal;
}

public static class LatchWithWrappedCallables<T>
{
    public CountDownLatch latch;
    public Collection<Callable<T>> wrappedCallables;
}

public static class CallableCountdownWrapper<T> implements Callable<T>
{
    private final Callable<T> wrapped;

    private final CountDownLatch latch;

    public CallableCountdownWrapper(Callable<T> wrapped, CountDownLatch latch)
    {
        this.wrapped = wrapped;
        this.latch = latch;
    }

    @Override
    public T call() throws Exception
    {
        try
        {
            return wrapped.call();
        }
        finally
        {
            latch.countDown();
        }
    }
}

那么你的代码将会这样调用:

Collection<Callable<String>> callablesToWrap = [Your callables that you need to wait for here];
LatchWithWrappedCallables<String> latchAndCallables = wrapCallables(callablesToWrap);

[Submit the wrapped callables to the executors here]

if(latchAndCallables.latch.await(timeToWaitInSec, TimeUnit.SECONDS))
{
    [Handling for timeout here]
}

转念一想,我认为我可能会将await直接卷入LatchWithWrappedCallables类中,以使它更加容易。 (是的,“LatchWithWrappedCallables”是一个可怕的名称!) - Cobra1117
明天我会更深入地考虑这个问题,看它是否符合我的需求。谢谢。 - John B
不错。我也需要看一下ListenableFuture这个东西。 - John B
原来 Futures.successfulAsList 就是我要找的。 - John B

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接