等待任何一个 Future<T> 完成。

58

我有一些异步任务正在运行,我需要等待至少其中一个完成(将来可能需要等待M个任务中的N个完成)。 目前它们被表示为Future,所以我需要类似于以下内容:

/**
 * Blocks current thread until one of specified futures is done and returns it. 
 */
public static <T> Future<T> waitForAny(Collection<Future<T>> futures) 
        throws AllFuturesFailedException

有类似的东西吗?或者类似的东西,不一定是针对Future。目前,我循环遍历Future的集合,检查是否有一个已经完成,然后睡眠一段时间再次检查。这看起来不是最好的解决方案,因为如果我睡眠的时间很长,则会添加不必要的延迟,如果我睡眠的时间很短,则可能会影响性能。

我可以尝试使用

new CountDownLatch(1)

当任务完成时,减少倒计时并执行操作。

countdown.await()

我发现只有在控制未来创建时才可能实现它。这是可能的,但需要系统重新设计,因为目前任务创建的逻辑(向ExecutorService发送Callable)与决定等待哪个Future分离。我也可以覆盖

<T> RunnableFuture<T> AbstractExecutorService.newTaskFor(Callable<T> callable)

并创建一个自定义的RunnableFuture实现,并能够附加监听器以在任务完成时得到通知,然后将这样的监听器附加到需要的任务并使用CountDownLatch,但这意味着我必须为我使用的每个ExecutorService覆盖newTaskFor - 并且可能会有不扩展AbstractExecutorService的实现。我也可以尝试包装给定的ExecutorService以达到相同的目的,但那样我就必须装饰所有产生Futures的方法。

所有这些解决方案都可能有效,但似乎非常不自然。看起来像我缺少一些简单的东西,比如

WaitHandle.WaitAny(WaitHandle[] waitHandles)

在C#中,是否有任何广为人知的解决方案来解决这种问题?

更新:

最初我根本没有访问Future的创建,因此没有优雅的解决方案。重新设计系统后,我获得了对Future创建的访问权限,并能够将countDownLatch.countdown()添加到执行过程中,然后我可以使用countDownLatch.await()并且一切正常。 感谢其他答案,我不知道ExecutorCompletionService,它确实可以在类似的任务中很有帮助,但在这种特殊情况下无法使用,因为一些Futures是在没有任何执行器的情况下创建的 - 实际任务通过网络发送到另一个服务器完成,然后接收完成通知。


4
许多人看到这个问题会更喜欢查看下面所接受的答案,而不是高票的回答,这些回答提到了ExecutorCompletionServiceExecutorService.invokeAny() - Robert Tupelo-Schneck
8个回答

55

6
这个类的文档,包括如何在第一个任务完成后取消所有其他任务的示例(如果您想这样做的话),可以在http://java.sun.com/javase/6/docs/api/java/util/concurrent/ExecutorCompletionService.html找到。 - Bill Michell
4
ExecutorCompletionService无法接受Futures,据我所知,这并没有回答原来的问题。 - Charlie
有趣的是,过去我总是因为回答一个单词而受到惩罚。我猜这是因为我不是一个体面的人。 - Little Alien

9

4
几乎一样。它需要一个 Collection<Callable> 而不是一个 Collection<Future> - finnw

7
为什么不创建一个结果队列并等待队列呢?或者更简单地使用CompletionService,因为它就是一个ExecutorService +结果队列。

6
这其实很容易用 wait() 和 notifyAll() 实现。
首先,定义一个锁对象。(你可以使用任何类来定义它,但我喜欢明确指出):
package com.javadude.sample;

public class Lock {}

接下来,定义您的工作线程。他必须在处理完成后通知锁对象。请注意,通知必须在锁定锁对象的同步块中进行。

package com.javadude.sample;

public class Worker extends Thread {
    private Lock lock_;
    private long timeToSleep_;
    private String name_;
    public Worker(Lock lock, String name, long timeToSleep) {
        lock_ = lock;
        timeToSleep_ = timeToSleep;
        name_ = name;
    }
    @Override
    public void run() {
        // do real work -- using a sleep here to simulate work
        try {
            sleep(timeToSleep_);
        } catch (InterruptedException e) {
            interrupt();
        }
        System.out.println(name_ + " is done... notifying");
        // notify whoever is waiting, in this case, the client
        synchronized (lock_) {
            lock_.notify();
        }
    }
}

最后,您可以编写您的客户端:
package com.javadude.sample;

public class Client {
    public static void main(String[] args) {
        Lock lock = new Lock();
        Worker worker1 = new Worker(lock, "worker1", 15000);
        Worker worker2 = new Worker(lock, "worker2", 10000);
        Worker worker3 = new Worker(lock, "worker3", 5000);
        Worker worker4 = new Worker(lock, "worker4", 20000);

        boolean started = false;
        int numNotifies = 0;
        while (true) {
            synchronized (lock) {
                try {
                    if (!started) {
                        // need to do the start here so we grab the lock, just
                        //   in case one of the threads is fast -- if we had done the
                        //   starts outside the synchronized block, a fast thread could
                        //   get to its notification *before* the client is waiting for it
                        worker1.start();
                        worker2.start();
                        worker3.start();
                        worker4.start();
                        started = true;
                    }
                    lock.wait();
                } catch (InterruptedException e) {
                    break;
                }
                numNotifies++;
                if (numNotifies == 4) {
                    break;
                }
                System.out.println("Notified!");
            }
        }
        System.out.println("Everyone has notified me... I'm done");
    }
}

你的解决方案正是我所想的。它看起来简洁易懂。+1 - Jacob Schoen

4
据我所知,Java没有类似于WaitHandle.WaitAny方法的结构。
我认为可以通过“WaitableFuture”装饰器来实现这一点:
public WaitableFuture<T>
    extends Future<T>
{
    private CountDownLatch countDownLatch;

    WaitableFuture(CountDownLatch countDownLatch)
    {
        super();

        this.countDownLatch = countDownLatch;
    }

    void doTask()
    {
        super.doTask();

        this.countDownLatch.countDown();
    }
}

尽管如此,这仅在可以在执行代码之前插入代码时才有效,否则执行代码将没有新的doTask()方法。但如果您无法以某种方式控制Future对象并在执行之前获得它,则我真的看不到任何方法可以做到这一点而不进行轮询。
或者,如果未来总是在自己的线程中运行,并且您可以以某种方式获取该线程。然后,您可以生成一个新线程以加入每个其他线程,然后在加入返回后处理等待机制...虽然这会非常丑陋,并且会引入很多开销。如果某些Future对象未完成,则可能会有许多依赖于死线程的阻塞线程。如果您不小心,这可能会泄漏内存和系统资源。
/**
 * Extremely ugly way of implementing WaitHandle.WaitAny for Thread.Join().
 */
public static joinAny(Collection<Thread> threads, int numberToWaitFor)
{
    CountDownLatch countDownLatch = new CountDownLatch(numberToWaitFor);

    foreach(Thread thread in threads)
    {
        (new Thread(new JoinThreadHelper(thread, countDownLatch))).start();
    }

    countDownLatch.await();
}

class JoinThreadHelper
    implements Runnable
{
    Thread thread;
    CountDownLatch countDownLatch;

    JoinThreadHelper(Thread thread, CountDownLatch countDownLatch)
    {
        this.thread = thread;
        this.countDownLatch = countDownLatch;
    }

    void run()
    {
        this.thread.join();
        this.countDownLatch.countDown();
    }
}

2

0

既然你不关心哪个线程先完成,为什么不为所有线程设置一个单一的WaitHandle并等待它呢?无论哪个线程先完成都可以设置该句柄。


1
我无法控制Futures的创建。并且不总是为每个Future分配单独的线程。有些任务通过网络分派到另一个JVM上执行,收到完成通知后,Future被标记为已完成。 我现在正在重新设计它,以控制Future的创建并添加countDownLatch。 - Pavel Feldman

-1

看见这个选项:

public class WaitForAnyRedux {

private static final int POOL_SIZE = 10;

public static <T> T waitForAny(Collection<T> collection) throws InterruptedException, ExecutionException {

    List<Callable<T>> callables = new ArrayList<Callable<T>>();
    for (final T t : collection) {
        Callable<T> callable = Executors.callable(new Thread() {

            @Override
            public void run() {
                synchronized (t) {
                    try {
                        t.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }, t);
        callables.add(callable);
    }

    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(POOL_SIZE);
    ExecutorService executorService = new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 0, TimeUnit.SECONDS, queue);
    return executorService.invokeAny(callables);
}

static public void main(String[] args) throws InterruptedException, ExecutionException {

    final List<Integer> integers = new ArrayList<Integer>();
    for (int i = 0; i < POOL_SIZE; i++) {
        integers.add(i);
    }

    (new Thread() {
        public void run() {
            Integer notified = null;
            try {
                notified = waitForAny(integers);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("notified=" + notified);
        }

    }).start();


    synchronized (integers) {
        integers.wait(3000);
    }


    Integer randomInt = integers.get((new Random()).nextInt(POOL_SIZE));
    System.out.println("Waking up " + randomInt);
    synchronized (randomInt) {
        randomInt.notify();
    }
  }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接