生产者 - 消费者模型:消费者如何停止?

7

我已经模拟了生产者消费者问题,并且有以下代码。我的问题是:如果消费者一直处于while(true)状态,他如何停止。

在下面的代码中,我添加了

                    if (queue.peek()==null)
                         Thread.currentThread().interrupt();

这在示例中很好用。但在我的实际设计中,这不起作用(有时生产者“放置”数据需要更长的时间,因此消费者抛出的异常是不正确的)。总的来说,我知道我可以放一个“毒药”数据,例如对象是XYZ,并在消费者中进行检查。但这个“毒药”会让代码看起来很糟糕。不知道是否有其他方法。

public class ConsumerThread implements Runnable
{
 private BlockingQueue<Integer> queue;
 private String name;
 private boolean isFirstTimeConsuming = true;
 public ConsumerThread(String name, BlockingQueue<Integer> queue)
 {
    this.queue=queue;
    this.name=name;
 }

@Override
public void run()
{
    try
    {       
        while (true)
        {   
            if (isFirstTimeConsuming)
            {
                System.out.println(name+" is initilizing...");
                Thread.sleep(4000);
                isFirstTimeConsuming=false;
            }
            try{

                if (queue.peek()==null)
                    Thread.currentThread().interrupt();

                Integer data = queue.take();

                System.out.println(name+" consumed ------->"+data);
                Thread.sleep(70);    

            }catch(InterruptedException ie)
            {
                System.out.println("InterruptedException!!!!");
                break;
            }
        }

        System.out.println("Comsumer " + this.name + " finished its job; terminating.");

    }catch (InterruptedException e)
    {
        e.printStackTrace();
    } 
}

}

5个回答

10

A: 不能保证仅因为peek返回null,生产者就已经停止生产。如果生产者只是被拖慢了速度,消费者现在退出了,而生产者却继续生产。所以“peek”-> “break”的想法基本上会失败。

B:从消费者设置一个“完成/运行”标志并在生产者中读取它也会失败,如果:

  1. 消费者检查标志,发现应该继续运行,然后执行“take”操作
  2. 同时,生产者正在将标志设置为“不再运行”
  3. 现在,消费者会永远阻塞,等待幽灵数据包

相反的情况也可能发生,导致一个数据包未被消费。

为了解决这个问题,您需要对'mutexes'进行额外的同步,超过了“BlockingQueue”的范畴。

C: 我发现“Rosetta Code”是决定什么是好实践的很好的资源,在这种情况下:

http://rosettacode.org/wiki/Synchronous_concurrency#Java

生产者和消费者必须就表示输入结束的对象(或对象中的属性)达成一致。然后,生产者在最后一个数据包中设置该属性,而消费者停止使用它。也就是您在问题中提到的“毒药”。

在上面的Rosetta Code示例中,这个“对象”只是一个名为“EOF”的空String

final String EOF = new String();

// Producer
while ((line = br.readLine()) != null)
  queue.put(line);
br.close();
// signal end of input
queue.put(EOF);

// Consumer
while (true)
  {
    try
      {
        String line = queue.take();
        // Reference equality
        if (line == EOF)
          break;
        System.out.println(line);
        linesWrote++;
      }
    catch (InterruptedException ie)
      {
      }
  }

是的,这被称为“毒药”,它可以正常工作,我只是想知道是否有更好的方法。 - adhg
@adhg - 那么我的回答的重点是:这似乎是唯一简单有效的技术。对我来说,它也似乎很“干净”(尽管,“干净”是一个非常主观的问题)。 - ArjunShankar
我完全同意你的观点。最终我采用了这个解决方案。谢谢 +1 - adhg
在多线程的情况下,这个解决方案似乎不起作用。 - undefined

3

不要在线程上使用中断,而是在不再需要时退出循环:

if (queue.peek()==null)
         break;

您可以使用变量来标记关闭操作待处理,然后在跳出循环之前关闭循环。
if (queue.peek()==null)
         closing = true;

//Do further operations ...
if(closing)
  break;

谢谢,我的问题更多地涉及“终止线程”,而queue.peek()==null并不能解决它(在实际问题中不起作用)。感谢您提供不使用interrupt的提示。 - adhg

0

如果您的队列在消费者终止之前可能为空,则需要一个标志来告诉线程何时停止。添加一个设置器方法,以便生产者可以告诉消费者关闭。然后修改您的代码,使其不是:

if (queue.isEmpty())
   break;

检查你的代码

if (!run)
{
   break;
}
else if (queue.isEmpty())
{
   Thread.sleep(200);
   continue;
}

0
在现实世界中,大多数消息都带有某种类型的头部,用于定义消息类型/子类型或不同的对象。
您可以创建一个命令和控制对象或消息类型,告诉线程在收到消息时要执行某些操作(例如关闭、重新加载表格、添加新的监听器等)。
这样,您可以让命令和控制线程将消息发送到正常的消息流中。您可以让CNC线程与大型系统中的操作终端进行通信,等等。

0

你可以使用这个类型安全的模式来处理毒丸:

public sealed interface BaseMessage {

    final class ValidMessage<T> implements BaseMessage {

        @Nonnull
        private final T value;


        public ValidMessage(@Nonnull T value) {
            this.value = value;
        }

        @Nonnull
        public T getValue() {
            return value;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            ValidMessage<?> that = (ValidMessage<?>) o;
            return value.equals(that.value);
        }

        @Override
        public int hashCode() {
            return Objects.hash(value);
        }

        @Override
        public String toString() {
            return "ValidMessage{value=%s}".formatted(value);
        }
    }

    final class PoisonedMessage implements BaseMessage {

        public static final PoisonedMessage INSTANCE = new PoisonedMessage();


        private PoisonedMessage() {
        }

        @Override
        public String toString() {
            return "PoisonedMessage{}";
        }
    }
}

public class Producer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    Producer(@Nonnull BlockingQueue<BaseMessage> messages) {
        this.messages = messages;
    }

    @Override
    public Void call() throws Exception {
        messages.put(new BaseMessage.ValidMessage<>(1));
        messages.put(new BaseMessage.ValidMessage<>(2));
        messages.put(new BaseMessage.ValidMessage<>(3));
        messages.put(BaseMessage.PoisonedMessage.INSTANCE);
        return null;
    }
}

public class Consumer implements Callable<Void> {

    @Nonnull
    private final BlockingQueue<BaseMessage> messages;

    private final int maxPoisons;


    public Consumer(@Nonnull BlockingQueue<BaseMessage> messages, int maxPoisons) {
        this.messages = messages;
        this.maxPoisons = maxPoisons;
    }

    @Override
    public Void call() throws Exception {
        int poisonsReceived = 0;
        while (poisonsReceived < maxPoisons && !Thread.currentThread().isInterrupted()) {
            BaseMessage message = messages.take();
            if (message instanceof BaseMessage.ValidMessage<?> vm) {
                Integer value = (Integer) vm.getValue();
                System.out.println(value);
            } else if (message instanceof BaseMessage.PoisonedMessage) {
                ++poisonsReceived;
            } else {
                throw new IllegalArgumentException("Invalid BaseMessage type: " + message);
            }
        }
        return null;
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接