使用队列的生产者/消费者线程

68
我想创建一种类似于生产者/消费者的线程应用程序,但我不确定在它们之间实现队列的最佳方法是什么。
所以我想到了两个想法(可能都是错误的)。我想知道哪一个更好,如果它们都不好,那么最佳的队列实现方法是什么。这些示例中,我主要关心队列的实现方式。我正在扩展一种队列类,它是内部类并且是线程安全的。下面是两个示例,每个示例有4个类。
主类 -
public class SomeApp
{
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
} 

消费者类 -

public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = QueueHandler.dequeue();
            //do some stuff with the object
        }
    }
}

生产者类 -

public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
        while(true)
        {
            //add to the queue some sort of unique object
            QueueHandler.enqueue(new Object());
        }
    }
}

队列类 -

public class QueueHandler
{
    //This Queue class is a thread safe (written in house) class
    public static Queue<Object> readQ = new Queue<Object>(100);

    public static void enqueue(Object object)
    {
        //do some stuff
        readQ.add(object);
    }

    public static Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

或者

主类-

public class SomeApp
{
    Queue<Object> readQ;
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        readQ = new Queue<Object>(100);
        consumer = new Consumer(readQ);
        producer = new Producer(readQ);
    }
} 

消费者类 -

public class Consumer implements Runnable
{
    Queue<Object> queue;

    public Consumer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = queue.dequeue();
            //do some stuff with the object
        }
    }
}

生产者类 -

public class Producer implements Runnable
{
    Queue<Object> queue;

    public Producer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {

        while(true)
        {
            //add to the queue some sort of unique object
            queue.enqueue(new Object());
        }
    }
}

队列类 -

//the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{    
    public QueueHandler(int size)
    {
        super(size); //All I'm thinking about now is McDonalds.
    }

    public void enqueue(Object object)
    {
        //do some stuff
        readQ.add();
    }

    public Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

开始吧!


1
生产者入队,消费者出队,顺序不可颠倒。 - Enno Shioji
4
不要在构造函数中启动线程!该线程可能会观察到对象处于不一致的状态。有关详细信息,请参阅《Java并发实践》。 - Enno Shioji
谢谢Zwei,enqueue的问题是我注意力不集中。从构造函数开始线程的事情,我应该在初始化方法中运行它,还是应该从主方法类中启动它? - Gareth
8个回答

82

在Java 5+中,您拥有完成此类操作所需的所有工具。 您需要:

  1. 将所有生产者放入一个ExecutorService中;
  2. 将所有消费者放入另一个 ExecutorService 中;
  3. 如有必要,使用BlockingQueue之间进行通信。

对于步骤(3),我说“如果必要”,因为根据我的经验,这是一步不必要的。您只需向消费者executor service提交新任务即可。因此:

final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
  producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
所以生产者直接提交给消费者

4
Cletus对于提供更多信息以帮助澄清“如何开始”是正确的。 http://java.sun.com/docs/books/tutorial/essential/concurrency/ - edwardsmatt
“因此,生产者直接向消费者提交” - 在并行调用 consumers.submit(...) 是安全的吗?还是需要在其周围进行同步? - Marsellus Wallace
如果您共享一个BlockingQueue,那么生产者和消费者是否可以只使用一个执行器呢? - devo
4
值得注意的是,如果“消费者”进程出现故障,导致需要重新处理数据,那么引入一个阻塞队列是值得的,这样处于错误状态的消费者可以将数据放回队列,以便其他消费者重新处理。 - mooreds

20

好的,正如其他人所指出的那样,最好的做法是使用java.util.concurrent包。我强烈推荐《Java并发编程实战》这本书,它覆盖了几乎你需要知道的所有内容。

至于你的具体实现,在我评论中指出,不要从构造函数中启动线程——这可能是不安全的。

撇开这一点不谈,第二种实现似乎更好。你不想将队列放在静态字段中。你可能只是为了毫无意义地失去灵活性。

如果您想继续使用自己的实现(我猜是为了学习?),至少提供一个start()方法。您应该构造对象(可以实例化Thread对象),然后调用start()以启动线程。

编辑:ExecutorService有自己的队列,因此可能会让人感到困惑...以下是一些让您入门的内容。

public class Main {
    public static void main(String[] args) {
        //The numbers are just silly tune parameters. Refer to the API.
        //The important thing is, we are passing a bounded queue.
        ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));

        //No need to bound the queue for this executor.
        //Use utility method instead of the complicated Constructor.
        ExecutorService producer = Executors.newSingleThreadExecutor();

        Runnable produce = new Produce(consumer);
        producer.submit(produce);   
    }
}

class Produce implements Runnable {
    private final ExecutorService consumer;

    public Produce(ExecutorService consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        Pancake cake = Pan.cook();
        Runnable consume = new Consume(cake);
        consumer.submit(consume);
    }
}

class Consume implements Runnable {
    private final Pancake cake;

    public Consume(Pancake cake){
        this.cake = cake;
    }

    @Override
    public void run() {
        cake.eat();
    }
}

进一步编辑: 对于生产者,你可以使用类似以下的代码,而不是while(true)

@Override
public void run(){
    while(!Thread.currentThread().isInterrupted()){
        //do stuff
    }
}

您可以通过调用.shutdownNow()来关闭执行器,如果使用while(true),则无法关闭。

此外,请注意Producer仍然容易受到RuntimeExceptions(即一个RuntimeException将停止处理)。


那么我应该在消费者(Consumer)和生产者(Producer)中添加一个start()方法吗?你是说我应该在我的主方法(main method)中放置类似这样的代码呢?consumer = new Consumer(); consumer.start(readQ);还是这样呢?consumer = new Comsumer(readQ); consumer.start(); - Gareth
1
通常情况下,您会执行new Comsumer(readQ); consumer.start();。在您的情况下,建议将队列声明为private final,并且如果这样做,您需要在构造函数中设置队列。如果这是生产代码,我强烈建议您采用cletus的答案。如果您绝对需要使用内部队列,则应使用ExecutorService executor = Executors.newSingleThreadExecutor()而不是原始线程。这将在其他事项之间保护您免受RuntimeException停止系统的影响。 - Enno Shioji
谢谢。非常有帮助。我已经采用了像cletus建议的BlockingQueue而不是内部队列。仍在努力理解ExecutorService类,但当我理解后一定会使用它。感谢您的帮助。 - Gareth

15

我已经将Cletus提出的答案扩展为可工作的代码示例。

  1. 一个ExecutorService(pes)接受Producer任务。
  2. 一个ExecutorService(ces)接受Consumer任务。
  3. 生产者和消费者都共享一个BlockingQueue
  4. 多个Producer任务生成不同的数字。
  5. 任何一个Consumer任务都可以消费由Producer生成的数字。

代码:

import java.util.concurrent.*;

public class ProducerConsumerWithES {
    public static void main(String args[]){
         BlockingQueue<Integer> sharedQueue = new LinkedBlockingQueue<Integer>();

         ExecutorService pes = Executors.newFixedThreadPool(2);
         ExecutorService ces = Executors.newFixedThreadPool(2);

         pes.submit(new Producer(sharedQueue,1));
         pes.submit(new Producer(sharedQueue,2));
         ces.submit(new Consumer(sharedQueue,1));
         ces.submit(new Consumer(sharedQueue,2));
         // shutdown should happen somewhere along with awaitTermination
         / * https://dev59.com/hVoV5IYBdhLWcg3wIb74#36644320 */
         pes.shutdown();
         ces.shutdown();
    }
}
class Producer implements Runnable {
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Producer(BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.threadNo = threadNo;
        this.sharedQueue = sharedQueue;
    }
    @Override
    public void run() {
        for(int i=1; i<= 5; i++){
            try {
                int number = i+(10*threadNo);
                System.out.println("Produced:" + number + ":by thread:"+ threadNo);
                sharedQueue.put(number);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable{
    private final BlockingQueue<Integer> sharedQueue;
    private int threadNo;
    public Consumer (BlockingQueue<Integer> sharedQueue,int threadNo) {
        this.sharedQueue = sharedQueue;
        this.threadNo = threadNo;
    }
    @Override
    public void run() {
        while(true){
            try {
                int num = sharedQueue.take();
                System.out.println("Consumed: "+ num + ":by thread:"+threadNo);
            } catch (Exception err) {
               err.printStackTrace();
            }
        }
    }   
}

输出:

Produced:11:by thread:1
Produced:21:by thread:2
Produced:22:by thread:2
Consumed: 11:by thread:1
Produced:12:by thread:1
Consumed: 22:by thread:1
Consumed: 21:by thread:2
Produced:23:by thread:2
Consumed: 12:by thread:1
Produced:13:by thread:1
Consumed: 23:by thread:2
Produced:24:by thread:2
Consumed: 13:by thread:1
Produced:14:by thread:1
Consumed: 24:by thread:2
Produced:25:by thread:2
Consumed: 14:by thread:1
Produced:15:by thread:1
Consumed: 25:by thread:2
Consumed: 15:by thread:1

注意:如果您不需要多个生产者和消费者,请保持单个生产者和消费者。我添加了多个生产者和消费者来展示阻塞队列在多个生产者和消费者之间的能力。


这并没有解决多个生产者和消费者存在时的竞态条件问题。每个人都认为容量为0并尝试添加。如果只有一个生产者和一个消费者,则不需要在BlockingQueue上进行同步,但如果超过一个,则仍然需要同步。 - Cleonjoys
你可以尝试一件事情,注释掉消费者,然后为BlockingQueue设置固定大小,你会看到结果。我用了你的代码和new LinkedBlockingQueue<Integer>(2);进行了测试。输出如下:Produced:11:by thread:1 Produced:21:by thread:2 Produced:22:by thread:2 Produced:12:by thread:1当队列的容量被设置为2时,如何插入更多的值呢? - Cleonjoys
这就是BlockingQueue的本质。除非有可用的容量,否则它将被阻塞。我正在使用无界阻塞队列,因此上述情况不会出现。即使由于有界阻塞队列而出现了问题,这也是Java实现的方式。请查看https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/LinkedBlockingQueue.html#put-E-。我发布的代码片段没有任何问题。 - Ravindra babu
你想如何在你的代码中设置竞争条件检查呢?如果我将BlockingQueue设置为大小为2,则它不应该接受超过2个元素,但是你的代码接受了超过2个元素。BlockingQueue会在下溢和上溢时自动等待,但不能保证多个线程之间的同步。 - Cleonjoys
你非常错误。将调试语句移动到put之后:System.out.println("Produced:" + number + ":by thread:"+ threadNo);。只插入两个元素。我的输出现在是:Produced:11:by thread:1 Produced:21:by thread:2。阻塞队列是线程安全的。 - Ravindra babu
这篇文章不应该被踩,因为你的假设是错误的。当容量不可用时,竞态条件与BlockingQueue.wait()不同。我的BlockingQueue是无界的,代码没有任何问题。运行此程序1000次,您永远不会遇到竞态条件。 - Ravindra babu

7

你正在重复造轮子。

如果你需要持久性和其他企业特性,请使用JMS(我建议使用ActiveMq)。

如果你需要快速的内存队列,请使用java的Queue中的一个实现。

如果你需要支持java 1.4或更早版本,请使用Doug Lea出色的concurrent包。


7
你仍然可能会在面试中被要求实现生产者消费者问题 :) - Martin Konicek
我确实发现java.util.concurrent中的工具很有用,但是我发现当它仍然强制我传递两个参数来指定超时时,很难称其为“优秀”。Doug会不会因为创建一个名为Duration的类而死亡? - Hakanai

2

这是非常简单的代码。

import java.util.*;

// @author : rootTraveller, June 2017

class ProducerConsumer {
    public static void main(String[] args) throws Exception {
        Queue<Integer> queue = new LinkedList<>();
        Integer buffer = new Integer(10);  //Important buffer or queue size, change as per need.

        Producer producerThread = new Producer(queue, buffer, "PRODUCER");
        Consumer consumerThread = new Consumer(queue, buffer, "CONSUMER");

        producerThread.start();  
        consumerThread.start();
    }   
}

class Producer extends Thread {
    private Queue<Integer> queue;
    private int queueSize ;

    public Producer (Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
        super(ThreadName);
        this.queue = queueIn;
        this.queueSize = queueSizeIn;
    }

    public void run() {
        while(true){
            synchronized (queue) {
                while(queue.size() == queueSize){
                    System.out.println(Thread.currentThread().getName() + " FULL         : waiting...\n");
                    try{
                        queue.wait();   //Important
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                //queue empty then produce one, add and notify  
                int randomInt = new Random().nextInt(); 
                System.out.println(Thread.currentThread().getName() + " producing... : " + randomInt); 
                queue.add(randomInt); 
                queue.notifyAll();  //Important
            } //synchronized ends here : NOTE
        }
    }
}

class Consumer extends Thread {
    private Queue<Integer> queue;
    private int queueSize;

    public Consumer(Queue<Integer> queueIn, int queueSizeIn, String ThreadName){
        super (ThreadName);
        this.queue = queueIn;
        this.queueSize = queueSizeIn;
    }

    public void run() {
        while(true){
            synchronized (queue) {
                while(queue.isEmpty()){
                    System.out.println(Thread.currentThread().getName() + " Empty        : waiting...\n");
                    try {
                        queue.wait();  //Important
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }

                //queue not empty then consume one and notify
                System.out.println(Thread.currentThread().getName() + " consuming... : " + queue.remove());
                queue.notifyAll();
            } //synchronized ends here : NOTE
        }
    }
}

1
  1. Java代码中的"BlockingQueue"具有同步的put和get方法。
  2. Java代码中的"Producer",生产者线程用于生产数据。
  3. Java代码中的"Consumer",消费者线程用于消费生产的数据。
  4. Java代码中的"ProducerConsumer_Main",主函数用于启动生产者和消费者线程。

BlockingQueue.java

public class BlockingQueue 
{
    int item;
    boolean available = false;

    public synchronized void put(int value) 
    {
        while (available == true)
        {
            try 
            {
                wait();
            } catch (InterruptedException e) { 
            } 
        }

        item = value;
        available = true;
        notifyAll();
    }

    public synchronized int get()
    {
        while(available == false)
        {
            try
            {
                wait();
            }
            catch(InterruptedException e){
            }
        }

        available = false;
        notifyAll();
        return item;
    }
}

Consumer.java

package com.sukanya.producer_Consumer;

public class Consumer extends Thread
{
    blockingQueue queue;
    private int number;
    Consumer(BlockingQueue queue,int number)
    {
        this.queue = queue;
        this.number = number;
    }

    public void run()
    {
        int value = 0;

        for (int i = 0; i < 10; i++) 
        {
            value = queue.get();
            System.out.println("Consumer #" + this.number+ " got: " + value);
        }
    }
}

ProducerConsumer_Main.java

package com.sukanya.producer_Consumer;

public class ProducerConsumer_Main 
{
    public static void main(String args[])
    {
        BlockingQueue queue = new BlockingQueue();
        Producer producer1 = new Producer(queue,1);
        Consumer consumer1 = new Consumer(queue,1);
        producer1.start();
        consumer1.start();
    }
}

4
没有解释的代码转储很少有帮助。 - Chris

0
  public class QueueHandler
{
 //winstead of Queue<Object> will replace  BlockingQueue <String> queue = new LinkedBlockingQueue <> ();
public static Queue<Object> readQ = new Queue<Object>(100);

public static void enqueue(Object object)
{
  
    readQ.add(object);
}

public static Object dequeue()
{
   
    return readQ.get();
}
}

public static BlockingQueue <String> queue = new LinkedBlockingQueue <> (); 

如果它是静态的,它可以工作,但如果它是非静态的,它可能无法正常工作。如何解决?


如果您有新的问题,请通过单击提问按钮来提出。如果它有助于提供上下文,请包含此问题的链接。- 来自审核 - Procrastinator

0
请使用这个类型安全的设计模式,包含毒药丸机制:
public sealed interface BaseMessage {

    final class ValidMessage<T> implements BaseMessage {

        @Nonnull
        private final T value;


        public ValidMessage(@Nonnull T value) {
            this.value = value;
        }

        @Nonnull
        public T getValue() {
            return value;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            ValidMessage<?> that = (ValidMessage<?>) o;
            return value.equals(that.value);
        }

        @Override
        public int hashCode() {
            return Objects.hash(value);
        }

        @Override
        public String toString() {
            return "ValidMessage{value=%s}".formatted(value);
        }
    }

    final class PoisonedMessage implements BaseMessage {

        public static final PoisonedMessage INSTANCE = new PoisonedMessage();


        private PoisonedMessage() {
        }

        @Override
        public String toString() {
            return "PoisonedMessage{}";
        }
    }
}

public class Producer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    Producer(@Nonnull BlockingQueue<BaseMessage> messages) {
        this.messages = messages;
    }

    @Override
    public Void call() throws Exception {
        messages.put(new BaseMessage.ValidMessage<>(1));
        messages.put(new BaseMessage.ValidMessage<>(2));
        messages.put(new BaseMessage.ValidMessage<>(3));
        messages.put(BaseMessage.PoisonedMessage.INSTANCE);
        return null;
    }
}

public class Consumer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    private final int maxPoisons;


    public Consumer(@Nonnull BlockingQueue<BaseMessage> messages, int maxPoisons) {
        this.messages = messages;
        this.maxPoisons = maxPoisons;
    }

    @Override
    public Void call() throws Exception {
        int poisonsReceived = 0;
        while (poisonsReceived < maxPoisons && !Thread.currentThread().isInterrupted()) {
            BaseMessage message = messages.take();
            if (message instanceof BaseMessage.ValidMessage<?> vm) {
                Integer value = (Integer) vm.getValue();
                System.out.println(value);
            } else if (message instanceof BaseMessage.PoisonedMessage) {
                ++poisonsReceived;
            } else {
                throw new IllegalArgumentException("Invalid BaseMessage type: " + message);
            }
        }
        return null;
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接