灵活的CountDownLatch?

30
我现在遇到了一个问题,生产者线程会产生N个工作项,将它们提交给ExecutorService,然后需要等待所有N个项目都被处理完才能继续执行。
注意事项:
- N的数量不是提前确定的,如果已知数量,我可以创建CountDownLatch并使生产者线程await直到所有任务完成。 - 使用CompletionService不合适,因为尽管我的生产线程需要阻塞(即调用take()),但没有办法表示所有工作都已完成,以便让生产线程停止等待。
我目前偏爱的解决方案是使用整数计数器。每当提交一个工作项时,就会将其增加,并在处理一个工作项时将其减小。所有N个任务提交后,我的生产者线程将需要在锁上等待,通知时检查counter == 0。如果消费线程将计数器减少并且新值为0,则需要通知生产者。
是否有更好的方法来解决这个问题,或者是否有java.util.concurrent中的适当结构可供使用,而不是“rolling my own”?
谢谢。

生产者什么时候知道有多少工作项?是在最后一个项目被生产时吗? - Ronald Wildenberg
1
你目前的解决方案可能存在竞态条件:生产物品1 -> 计数器++ -> 处理物品1 -> 计数器-- -> 生产物品2。由于计数器在生产者生产下一个物品之前已经被减少,因此生产者认为他已经准备好了。 - Ronald Wildenberg
@rwwilden:你说的情况是正确的。然而,在提交了所有工作项之后,我的生产者只会检查/等待计数器,因此在这种特定情况下它并不代表竞态条件。 - Adamski
6个回答

32

java.util.concurrent.Phaser 看起来非常适合您的需求。它计划在Java 7中发布,但最稳定版本可在 jsr166 的兴趣组网站上找到。

Phaser是一个更高级的循环障碍器。你可以注册N个参与方,并在特定阶段等待他们的前进。

以下是一个快速示例,说明它如何工作:

final Phaser phaser = new Phaser();

public Runnable getRunnable(){
    return new Runnable(){
        public void run(){
            ..do stuff...
            phaser.arriveAndDeregister();
        }
    };
}
public void doWork(){
    phaser.register();//register self
    for(int i=0 ; i < N; i++){
        phaser.register(); // register this task prior to execution 
        executor.submit( getRunnable());
    }
    phaser.arriveAndAwaitAdvance();
}

3
很酷 - 谢谢;这看起来正是我想要的...不过他们竟然把它叫做“相位器”,真让人难以置信。 - Adamski
2
如果 getRunnable() 的目的是表示一个仅向 Phaser 发出信号然后结束的任务,您需要调用 phaser.arriveAndDeregister() 而不是 phaser.arrive(),否则当父任务第二次调用 phaser.arriveAndAwaitAdvance() 时,它将死锁,因为该任务将等待仍在 Phaser 中注册的已完成任务。 - Tiago Cogumbreiro
太棒了。以前从未听说过。 - Ravindra babu

2
当然,您可以使用由AtomicReference保护的CountDownLatch,以便将您的任务包装如下:
public class MyTask extends Runnable {
    private final Runnable r;
    public MyTask(Runnable r, AtomicReference<CountDownLatch> l) { this.r = r; }

    public void run() {
        r.run();
        while (l.get() == null) Thread.sleep(1000L); //handle Interrupted
        l.get().countDown();
    }
}

请注意,任务运行其工作,然后"旋转"直到倒计时设置(即已知总任务数)。一旦倒计时被设置,它们就开始倒计时并退出。提交方式如下:
AtomicReference<CountDownLatch> l = new AtomicReference<CountDownLatch>();
executor.submit(new MyTask(r, l));

在您创建/提交工作之后,当您知道自己创建了多少个任务时
latch.set(new CountDownLatch(nTasks));
latch.get().await();

为什么将CountDownLatch包装在AtomicReference中会在这种情况下对您有所帮助?该引用不需要受到保护,因为它在构造函数中传递给MyTask,并且此后永远不会更改。 - Avi
此外,请确保在run()方法中处理任意的throwable(RuntimeExceptions和Errors),至少通过将coundDown()放置在finally{}块中来处理。 - Avi
当然,这个问题特别是针对在任务创建时不知道任务数量的情况。 - Avi
@Avi - 你必须在某个时候了解 nTasks(正如你在回复中所假设的那样)。原子引用有助于因为 CDL 必须在多个线程之间安全地设置。我的方法确实有效,你的也是。 - oxbow_lakes
1
@oxbow_lakes:你说得对 - 我在评论时没有完全理解你的代码。我认为可以通过使用wait()和notifyAll()来避免旋转。 - Avi
我知道已经过去了好几年,但是MyTask应该实现Runnable接口吗? - user1531971

1
我曾经使用过ExecutorCompletionService来处理类似这样的事情:
ExecutorCompletionService executor = ...;
int count = 0;
while (...) {
    executor.submit(new Processor());
    count++;
}

//Now, pull the futures out of the queue:
for (int i = 0; i < count; i++) {
    executor.take().get();
}

这涉及到保持已提交任务的队列,因此如果您的列表任意长,您的方法可能更好。

但一定要使用 AtomicInteger 进行协调,以便能够在一个线程中递增它,并在工作线程中递减它。


你如何使用 awaitTermination?是什么关闭了你的服务?此外,无论如何你都不能重新使用完成服务来回答,因为你可能正在等待属于另一个集合的任务。并且(如我的答案评论中所提到的),你仍然需要在某个时候知道 nTasks - oxbow_lakes
@oxbow_lakes:没错。这就是为什么我只把它作为一个选项(我已经编辑掉了,因为在这种情况下没有帮助)。 我假设这个完成服务不会同时用于另一组任务(重叠)。它可以稍后再次使用,来自同一个线程。 - Avi
使用这种跟踪任务数量的方式而不是使用AtomicBoolean的优点在于,您不必手动处理wait()和notify() - 队列会自动处理。您只需要跟踪队列中的项目数量即可。 - Avi
因此,本质上您引入了第二个队列,其中包含任务完成,生产者会对其进行消耗。一旦生产者知道它发送了所有任务,消费者就拥有发送正确信号的所有信息。在我看来,任务总数是不必建模到解决此问题的解决方案中的人为构件。 - rsp

0

这是一个独立的Java 8+方法,使用单遍PhaserStream执行此操作。 Iterator/Iterable变体完全相同。

public static <X> int submitAndWait(Stream<X> items, Consumer<X> handleItem, Executor executor) {
    Phaser phaser = new Phaser(1); // 1 = register ourselves
    items.forEach(item -> {
            phaser.register(); // new task
            executor.execute(() -> {
                handleItem.accept(item);
                phaser.arrive(); // completed task
            });
    });
    phaser.arriveAndAwaitAdvance(); // block until all tasks are complete
    return phaser.getRegisteredParties()-1; // number of items
}
...
int recognised = submitAndWait(facesInPicture, this::detectFace, executor)

顺便提一下,这对于单个事件来说是可以的,但如果同时调用,涉及到ForkJoinPool的解决方案将阻止父线程的阻塞。


0

你所描述的与使用标准信号量相当类似,但是是“反向”使用。

  • 你的信号量从0个许可开始
  • 每个工作单元在完成时释放一个许可
  • 通过等待获取N个许可来阻塞

另外,你还可以灵活地仅获取M < N个许可,这在想要检查中间状态时非常有用。例如,我正在测试异步有界消息队列,因此我希望队列在某些M < N时已满,因此我可以获取M并检查队列确实已满,然后在从队列中消耗消息后获取剩余的N - M个许可。


0

我假设您的生产者不需要知道队列何时为空,但需要知道最后一个任务何时完成。

我会在消费者中添加一个waitforWorkDone(producer)方法。生产者可以添加它的N个任务并调用等待方法。如果工作队列不为空且当前没有任务正在执行,则等待方法会阻塞传入线程。

如果消费者的任务已完成,队列为空且没有其他任务正在执行,则消费者线程会在等待锁上调用notifyAll()


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接