阻塞队列和多线程消费者,如何知道何时停止

73

我有一个单线程的生产者,创建一些任务对象并将它们添加到 ArrayBlockingQueue 中(大小是固定的)。

我还启动了一个多线程消费者。这是构建为固定数量线程池 (Executors.newFixedThreadPool(threadCount);)。然后,我向该线程池提交一些 ConsumerWorker 实例,每个 ConsumerWorker 都有对上述 ArrayBlockingQueue 实例的引用。

每个这样的 Worker 将在队列上执行 take() 操作,并处理任务。

我的问题是,如何最好地让 Worker 知道没有更多任务需要完成。换句话说,我如何告诉 Workers,生产者已经完成了向队列添加任务的工作,并且从此时起,每个 Worker 在看到队列为空时都应该停止。

我现在拥有的设置是,我的 Producer 使用回调进行初始化,当它完成其工作(向队列添加任务)时触发回调。我还保留了所有已创建并提交到 ThreadPool 的 ConsumerWorker 的列表。当 Producer 回调告诉我生产者完成时,我可以告诉每个 Worker。此时,他们应该继续检查队列是否不为空,当队列变为空时,他们应该停止,从而允许我优雅地关闭 ExecutorService 线程池。就像这样

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning || !inputQueue.isEmpty()) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
    this.isRunning = isRunning;
}

}

这里的问题是存在明显的竞态条件,有时生产者会完成,发出信号,然后消费者工作线程将在消耗队列中的所有内容之前停止。

我的问题是,最好的同步方法是什么,以便它可以正常工作?我应该同步检查生产者是否正在运行加上队列是否为空加上从队列中获取某些内容的整个部分(在队列对象上)吗?还是只需同步对ConsumerWorker实例上的isRunning布尔值的更新?有其他建议吗?

更新,下面是我最终使用的工作实现:

public class ConsumerWorker implements Runnable{

private BlockingQueue<Produced> inputQueue;

private final static Produced POISON = new Produced(-1); 

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(true) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Produced queueElement = inputQueue.take();
            Thread.sleep(new Random().nextInt(100));
            if(queueElement==POISON) {
                break;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Consumer "+Thread.currentThread().getName()+" END");
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
    try {
        inputQueue.put(POISON);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

}

这受到了JohnVint在下面的回答的很大启发,仅做了一些小的修改。

===因为@vendhan的评论而更新。

感谢您的观察。 您是正确的,在本问题中的第一个代码片段中(除其他问题之外),while(isRunning || !inputQueue.isEmpty())并没有真正意义上的意义。

在我实际的最终实现中,我做了更接近您建议的用"&&"(与)替换"||"(或)的操作,每个工作者(消费者)现在只检查他从列表中获取的元素是否为毒丸,如果是,则停止操作(因此理论上我们可以说工作者必须在运行且队列不为空的情况下才能继续操作)。


2
一个executorService已经有了队列,所以你不需要另一个。你可以使用shutdown()启动整个executor服务。 - Peter Lawrey
9
由于ExecutorService已经有一个队列,因此您可以将任务添加到其中,而无需额外的队列,并且不需要考虑如何停止它们,因为这已经实现了。 - Peter Lawrey
@Shivan Dragon,问题:为什么你不把毒药放回队列中?如果有多个消费者,其他人将永远得不到这种毒药,也永远无法停止。 - yuris
@yuris:是的,你说得对。这段代码有一部分缺失:由于在这种情况下我已经知道有多少个消费者,当我想让它们全部停止(当生产者没有更多的产品时),生产者将向队列中添加等于消费者工作线程数量的毒丸元素。这样每个消费者都会拿到自己的毒丸。还有其他方法可以做到这一点(比如先查看,只有在元素不是毒丸时才取出。如果是毒丸,你(消费者工作者)不取它,而是简单地停止/死亡)。 - Shivan Dragon
显示剩余6条评论
6个回答

88

您应该继续从队列中进行take()操作。您可以使用毒丸(pois on pill)来告知工作进程停止。例如:

private final Object POISON_PILL = new Object();

@Override
public void run() {
    //worker loop keeps taking en element from the queue as long as the producer is still running or as 
    //long as the queue is not empty:
    while(isRunning) {
        System.out.println("Consumer "+Thread.currentThread().getName()+" START");
        try {
            Object queueElement = inputQueue.take();
            if(queueElement == POISON_PILL) {
                 inputQueue.add(POISON_PILL);//notify other threads to stop
                 return;
            }
            //process queueElement
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
    //you can also clear here if you wanted
    isRunning = false;
    inputQueue.add(POISON_PILL);
}

47
我的技术词汇表又增加了一个术语——"毒丸",点赞(+1)。 - Kiril
2
我已经尝试过这个,它运行得很好,除了一个小修改:我必须执行inputQueue.put(POISON_PILL);而不是offer(),因为如果我执行offer()并且队列此时已满(即工作线程非常懒惰),它将无法将POISON PILL元素添加到队列中。请问这样正确吗,还是我在说蠢话? - Shivan Dragon
你认为当前的设计可能会浪费大量的CPU资源吗?例如,如果队列为空,等待元素时所有线程都变得繁忙,因为没有IO中断? - Wild Goat
@Rhubarb 这是一个非常好的观点。我至少查看了代码,唯一的副作用是每个“take”获取两次锁而不是一次。虽然这是一个合理的方法。 - John Vint
是的,如果另一个消费者执行了take操作并且队列为空,则它们将等待,直到另一个线程使用相同的对象发出通知。 - John Vint
显示剩余6条评论

14

我会发送一个特殊的工作数据包给工人们,以示他们应该关闭:

public class ConsumerWorker implements Runnable{

private static final Produced DONE = new Produced();

private BlockingQueue<Produced> inputQueue;

public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
    this.inputQueue = inputQueue;
}

@Override
public void run() {
    for (;;) {
        try {
            Produced item = inputQueue.take();
            if (item == DONE) {
                inputQueue.add(item); // keep in the queue so all workers stop
                break;
            }
            // process `item`
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

要停止Worker,只需将ConsumerWorker.DONE添加到队列中。


1
我们能否使用CountDownLatch来实现,其中大小是生产者中记录的数量。每个消费者在处理记录后都会countDown。当所有任务完成时,它会越过awaits()方法。然后停止所有消费者。因为所有记录都已经被处理了。

1
在你的代码块中,当你尝试从队列中检索元素时,请使用poll(time,unit)而不是take()
try { 
    Object queueElement = inputQueue.poll(timeout,unit);
     //process queueElement        
 } catch (InterruptedException e) {
        if(!isRunning && queue.isEmpty())
         return ; 
 } 

通过指定timeout的适当值,确保线程不会因为不幸的连锁反应而持续阻塞:
  1. isRunning为真
  2. 队列变为空,因此线程进入阻塞等待状态(如果使用take())
  3. isRunning被设置为假

是的,这是我尝试过的其中一种方法,但它有一些缺点:首先,我会做很多不必要的poll()调用,然后当执行(!isRunning && queue.isEmpty())并从队列中取出东西时,我必须使用同步块将它们全部同步,这是多余的,因为BlockingQueue已经自己处理了所有这些。 - Shivan Dragon
为了避免第一个问题,你为什么不安排设置isRunning为false的线程在等待take()调用的线程上发送中断信号呢?捕获块仍然以相同的方式工作 - 这不需要单独的同步 - 除非你计划将isRunning从false重新设置为true。 - Bhaskar
我也尝试过这种方法,向你的线程发送中断信号。问题是,如果你的线程是由ExecutorService(如FixedThreadPool)启动的,当你执行executorService.shutDown()时,所有的线程都会收到InterruptedException,这将使它们在任务进行到一半时停止(因为它们现在被设置为将InterruptedException视为停止器)。此外,通过抛出异常进行通信并不是非常高效的方式。 - Shivan Dragon

0
我不得不使用多线程的生产者和消费者。最终我采用了一个 Scheduler -- N Producers -- M Consumers 的方案,每两个通过一个队列进行通信(总共两个队列)。调度程序将第一个队列填充请求以生成数据,然后再填充N个“毒丸”。有一个活动生产者计数器(原子整数),最后一个接收到最后一个毒丸的生产者会向消费者队列发送M个毒丸。

0

有许多策略可以使用,但其中一个简单的方法是创建一个任务子类来表示作业结束。生产者不会直接发送此信号,而是将此任务子类的实例加入队列中。当您的消费者取出并执行此任务时,就会发出信号。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接