Java:生产者=消费者,如何知道何时停止?

6

我有几个使用ArrayBlockingQueue的工作者。

每个工作者从队列中取出一个对象进行处理,可能会得到多个对象作为结果,这些对象将被放入队列以供进一步处理。因此,工作者=生产者+消费者。

工作者:

public class Worker implements Runnable
{
    private BlockingQueue<String> processQueue = null;

    public Worker(BlockingQueue<String> processQueue)
    {
        this.processQueue = processQueue;
    }

    public void run()
    {
        try
        {
            do
            {
                String item = this.processQueue.take();
                ArrayList<String> resultItems = this.processItem(item);

                for(String resultItem : resultItems)
                {
                    this.processQueue.put(resultItem);
                }
            }
            while(true);
        }
        catch(Exception)
        {
            ...
        }
    }

    private ArrayList<String> processItem(String item) throws Exception
    {
        ...
    }
}

主要内容:

public class Test
{
    public static void main(String[] args) throws Exception
    {
        new Test().run();
    }

    private void run() throws Exception
    {
        BlockingQueue<String> processQueue = new ArrayBlockingQueue<>(10000);
        processQueue.put("lalala");

        Executor service = Executors.newFixedThreadPool(100);
        for(int i=0; i<100; ++i)
        {
            service.execute(new Worker(processQueue));
        }
    }
}

没有更多的工作时,最好的停止工作方式是什么?

首先,我的想法是定期检查队列中有多少项和当前处理的项。如果两者均为零,则在ExecutorService上执行类似于“shutdownNow()”的操作。但我不确定这是否是最佳方法。


更有趣的问题是,你将如何防止循环模式的出现,显然,在某些情况下队列可能会自我复制。 - andbi
请问您能否澄清使用情况?您希望应用程序在队列为空时自动终止,还是只有在收到关闭请求后,它才会在队列为空时才关闭? - Handerson
@Osw,这里的“循环模式”是什么意思? - Oleg Golovanov
@Handerson,是的,当没有更多工作时,我想终止应用程序本身。 - Oleg Golovanov
@OlegGolovanov 如果您能够发布一个简单的例子(包括生产者、消费者和执行器),那么理解您的问题可能会更容易。 - assylias
3个回答

2
如果没有更多的工作要做,将一条消息放入队列中,并让工作者们在自己方便的时候关闭。这是防止数据损坏的好方法。
如果您需要通知另一个线程所有工作者都已下班,您可以使用 CountDownLatch 来实现。

1
我无法使用“毒丸”因为我没有一个生产者知道何时没有更多的工作需要生产。 关于CountDownLatch:为了做到这一点,每个工人都必须知道何时没有更多的工作 :) - Oleg Golovanov

1
听起来你已经有了解决方案--使用一个单独的正在进行中的队列,其大小将是当前正在处理的项目数量。如果你遵循访问任一队列都在synchronized(theArrayBlockingQueue)块中的约定,那么一切都应该没问题。特别是,在将项目移动到处理状态时,在同一个同步块内从theArrayBlockingQueue中移除它并将其添加到processingQueue中。

谢谢回答 :) 我真的需要“processingQueue”吗,或者AtomicInteger(表示当前正在处理多少项)就足够了?也许我不知道某些东西。 - Oleg Golovanov
我喜欢将“状态”表示为数据结构,而不是线程的本地变量。例如,作为控制线程,我可以在theArrayBlockingQueue上进行同步,杀死所有工作线程,将工作进度队列中的项目返回到theArrayBlockingQueue,并稍后重新启动工作线程。如果您必须与线程通信以恢复其状态,则更难实现。 - karmakaze

0
我稍微修改了你的代码,不确定这是否符合你的期望,但至少它能终止!如果你使用shutdownNow而不是shutdown,你的工作线程将被中断,除非你让它们重新工作,否则将退出,不能保证队列为空。
public class Test {

    public static void main(String[] args) throws Exception {
        new Test().run();
    }

    private void run() throws Exception {
        BlockingQueue<String> processQueue = new ArrayBlockingQueue<>(10000);
        processQueue.put("lalalalalalalalalalalalala"); //a little longer to make sure there is enough to process

        ExecutorService service = Executors.newFixedThreadPool(100);
        for (int i = 0; i < 100; ++i) {
            service.execute(new Worker(processQueue));
        }
        service.shutdown(); //orderly shutdown = lets the tasks terminate what they are doing
        service.awaitTermination(1, TimeUnit.SECONDS); //blocks until all tasks have finished or throws TimeOutException if timeout is reached
    }

    public static class Worker implements Runnable {

        private BlockingQueue<String> processQueue = null;
        private int count = 0;

        public Worker(BlockingQueue<String> processQueue) {
            this.processQueue = processQueue;
        }

        @Override
        public void run() {
            try {
                do {
                    //tries to get something from the queue for 100ms and returns null if it could not get anything
                    String item = this.processQueue.poll(100, TimeUnit.MILLISECONDS);
                    if (item == null) break; //Ends the job because the queue was empty
                    count++;
                    List<String> resultItems = this.processItem(item);

                    for (String resultItem : resultItems) {
                        this.processQueue.put(resultItem);
                    }
                } while (true);
            } catch (InterruptedException e) {
                System.out.println("Interrupted");
                Thread.currentThread().interrupt();
            }
            if (count != 0) System.out.println(Thread.currentThread() + ": processed " + count + " entries");
        }

        private List<String> processItem(String item) { //let's put the string back less final character
            if (item.isEmpty()) {
                return Collections.<String> emptyList();
            } else {
                return Arrays.asList(item.substring(0, item.length() - 1));
            }
        }
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接