Java生产者/消费者,如何检测处理结束

3
我正在准备一个应用程序,其中单个生产者会生成数百万个任务,然后由可配置数量的消费者进行处理。从生产者到消费者的通信(可能)将基于队列实现。
从运行生产者/生成任务的线程中,我可以使用哪种方法来等待所有任务完成? 我不想恢复任何定期轮询以查看我的任务队列是否为空。无论如何,任务队列为空并不能保证最后一个任务已经完成。这些任务可能是相对长时间运行的,因此很有可能在消费者线程仍在愉快处理时任务队列为空。
此致,Maarten
4个回答

3

您可能需要查看java.util.concurrent包。

执行器框架已经提供了通过线程池执行任务的手段。 Future 抽象允许等待任务完成。

将两者结合起来,可以轻松协调执行,解耦任务、活动(线程)和结果。

例如:

    ExecutorService executorService = Executors.newFixedThreadPool(16);

    List<Callable<Void>> tasks = null;
    //TODO: fill tasks;

    //dispatch 
    List<Future<Void>> results =  executorService.invokeAll(tasks);

    //Wait until all tasks have completed
    for(Future<Void> result: results){
        result.get();
    }

编辑:使用 CountDownLatch 的替代版本

    ExecutorService executorService = Executors.newFixedThreadPool(16);

    final CountDownLatch latch;

    List<Callable<Void>> tasks = null;
    //TODO: fill tasks;

    latch = new CountDownLatch(tasks.size());

    //dispatch 
    executorService.invokeAll(tasks);

    //Wait until all tasks have completed
    latch.await();

而在你的任务中:

    Callable<Void> task = new Callable<Void>()
    {

        @Override
        public Void call() throws Exception
        {
            // TODO: do your stuff

            latch.countDown(); //<---- important part
            return null;
        }
    };

是的,我已经计划使用 Executors 了。然而,除非我误解 Futures,否则我认为我不能使用它们。我可能会有几百万个任务,这意味着必须在内存中跟踪数百万个 Futures。我真的不想这样做... - Maarten Boekhold
1
你也可以使用 CountDownLatch,我添加了另一个示例。 - b_erb
哦,我喜欢CountDownLatch的想法,但是我不知道任务的数量,也不能先生成完整的任务列表,因为它不适合可用的JVM堆...有没有替代CountDownLatch的方法,基本上是一些引用计数跟踪器,在添加任务时可以执行“refcount ++”,完成任务时执行“refcount--”? - Maarten Boekhold
好的,当你不知道任务数量时,实现就会变得复杂一些。所以你可能需要实现自己的CountDownLatch,这样你可以增加计数。你还必须添加另一个方法来激活真正的障碍。这在你分派了最后一个任务的时刻是必要的。否则,由于提前排空,Latch 可能会破裂。使用AtomicInteger、一个易失的布尔值和一个内部的CountDownLatch(1)来实现应该很容易,甚至可以使用AbstractQueuedSynchronizer - b_erb
我想我要尝试使用类似于http://www.java2s.com/Tutorial/Java/0120__Development/CountUpDownLatch.htm上发布的CountUpDownLatch。除此之外,我认为该示例需要一些“synchronized”和“volatile”才能使其完全线程安全。 - Maarten Boekhold
是的,你应该这样做。目前看起来绝对不是线程安全的。 - b_erb

1

你想知道每个任务完成的位置。我会有另一个已完成任务报告的队列(每个任务一个对象/消息)。当这个计数达到你创建的任务数量时,它们都已经完成了。这个任务报告还可以包含任何任务的错误和时间信息。


嗯,有趣的想法。这将需要一个额外的线程来读取报告队列。那个报告线程可以在某个监视对象上通知生产者线程。 - Maarten Boekhold

1

你可以让每个消费者在出队时检查队列是否为空,如果是,则唤醒一个条件变量(或监视器,因为我相信Java有这个功能),主线程正在等待该变量。

让线程检查一个全局布尔变量(标记为volatile)是一种让线程知道它们应该停止的方法。


这告诉你每个任务都已出队,而不是每个任务都已完成。 - Peter Lawrey
这就是为什么第二段建议使用全局变量来告诉线程停止自己,然后加入它们。 - Robert Allan Hennigan Leahy

0

你可以使用join()方法来处理每个线程,这样在所有线程完成之前,主线程将不会结束!通过这种方式,你实际上可以找出所有线程是否已经完成!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接