我有一个单线程的生产者,创建一些任务对象并将它们添加到 ArrayBlockingQueue
中(大小是固定的)。
我还启动了一个多线程消费者。这是构建为固定数量线程池 (Executors.newFixedThreadPool(threadCount);
)。然后,我向该线程池提交一些 ConsumerWorker 实例,每个 ConsumerWorker 都有对上述 ArrayBlockingQueue 实例的引用。
每个这样的 Worker 将在队列上执行 take()
操作,并处理任务。
我的问题是,如何最好地让 Worker 知道没有更多任务需要完成。换句话说,我如何告诉 Workers,生产者已经完成了向队列添加任务的工作,并且从此时起,每个 Worker 在看到队列为空时都应该停止。
我现在拥有的设置是,我的 Producer 使用回调进行初始化,当它完成其工作(向队列添加任务)时触发回调。我还保留了所有已创建并提交到 ThreadPool 的 ConsumerWorker 的列表。当 Producer 回调告诉我生产者完成时,我可以告诉每个 Worker。此时,他们应该继续检查队列是否不为空,当队列变为空时,他们应该停止,从而允许我优雅地关闭 ExecutorService 线程池。就像这样
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}
这里的问题是存在明显的竞态条件,有时生产者会完成,发出信号,然后消费者工作线程将在消耗队列中的所有内容之前停止。
我的问题是,最好的同步方法是什么,以便它可以正常工作?我应该同步检查生产者是否正在运行加上队列是否为空加上从队列中获取某些内容的整个部分(在队列对象上)吗?还是只需同步对ConsumerWorker实例上的isRunning布尔值的更新?有其他建议吗?
更新,下面是我最终使用的工作实现:
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private final static Produced POISON = new Produced(-1);
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(true) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Produced queueElement = inputQueue.take();
Thread.sleep(new Random().nextInt(100));
if(queueElement==POISON) {
break;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Consumer "+Thread.currentThread().getName()+" END");
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
try {
inputQueue.put(POISON);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
这受到了JohnVint在下面的回答的很大启发,仅做了一些小的修改。
===因为@vendhan的评论而更新。
感谢您的观察。 您是正确的,在本问题中的第一个代码片段中(除其他问题之外),while(isRunning || !inputQueue.isEmpty())
并没有真正意义上的意义。
在我实际的最终实现中,我做了更接近您建议的用"&&"(与)替换"||"(或)的操作,每个工作者(消费者)现在只检查他从列表中获取的元素是否为毒丸,如果是,则停止操作(因此理论上我们可以说工作者必须在运行且队列不为空的情况下才能继续操作)。