使用Monitor.Wait()和Monitor.Pulse()时的线程问题

4

我有一个ASP.NET中的生产者-消费者场景。我设计了一个Producer类,一个Consumer类和一个用于持有共享对象并负责生产者和消费者之间通信的类,我们称其为Mediator。因为我在启动时(在父对象中)分叉执行路径,一个线程将调用Producer.Start(),另一个线程将调用Consumer.Start(),所以我需要通过ConstructorMediator的引用传递给ProducerConsumerMediator是一个智能类,它将优化许多事情,如其内部队列的长度,但现在先将其视为循环阻塞队列。 Producer会将新对象入队Mediator,直到队列满了,然后Producer将阻塞。ConsumerMediator出队对象,直到队列中没有任何东西。为了在线程之间进行信号传递,我在Mediator类中实现了两种方法:Wait()Pulse()。代码大致如下:

Class Mediator
{
  private object _locker = new object();

  public void Wait()
  {
    lock(_locker)
      Monitor.Wait(_locker);
  }

  public void Pulse()
  {
    lock(_locker)
      Monitor.Pulse(_locker);
  }
}

// This way threads are signaling:

Class Consumer
{
  object x;
  if (Mediator.TryDequeue(out x))
    // Do something
  else
    Mediator.Wait();
}

在 Mediator 中,每当有东西被 Enqueued 或 Dequeued 时,我使用 this.Pulse() 来通知等待的线程继续工作。但是我遇到了死锁问题,因为我从未使用过这种设计来通知线程,所以我不确定是设计有问题还是我在其他地方做错了什么?谢谢。

为什么每次出队时都要调用Pulse(从而让生产者重新获取锁)?通常情况下,消费者会一直消费直到没有剩余物品,然后才调用Wait而不是Pulse。 - BrokenGlass
因为生产者正在从我不确定其速度和可用性的外部服务器接收数据。因此,为了保证服务质量(QoS),我提前缓冲了消费者5分钟的需求。 - Xaqron
6个回答

8

这里的代码量不够,但是我的猜测是你遇到了一个活锁问题。如果在Mediator.Wait之前调用了Mediator.Pulse,那么信号会丢失,即使队列中有内容。以下是实现阻塞队列的标准模式。

public class BlockingQueue<T>
{
  private Queue<T> m_Queue = new Queue<T>();

  public void Enqueue(T item)
  {
    lock (m_Queue)
    {
      m_Queue.Enqueue(item);
      Monitor.Pulse(m_Queue);
    }
  }

  public T Dequeue()
  {
    lock (m_Queue)
    {
      while (m_Queue.Count == 0) 
      {
        Monitor.Wait(m_Queue);
      }
      return m_Queue.Dequeue();
    }
  }
}

注意当队列为空时才会调用Monitor.Wait。同时还要注意它在一个while循环中被调用。这是因为Wait方法没有优先级高于Enter方法,所以新的线程可能会在Dequeue时取走最后一个元素,即使已经有一个Wait方法准备返回。如果没有循环,一个线程可能会尝试从空队列中移除元素。

4
如果您可以使用.NET 4,最好使用BlockingCollection<T>(http://msdn.microsoft.com/en-us/library/dd267312.aspx),它可以处理排队、出队和队列长度的限制。

我简化了场景。中介者是一个复杂的类,可以做更多的事情。该项目已经在.NET 4.0上运行,并且已经使用了一些功能,如ConcurrentQueue和ConcurrentBag。问题是关于生产者和消费者线程之间的信号方法。 - Xaqron
BlockingCollection<T> 几乎肯定是它们之间通信的最佳方式。除非您有非常特定的要求无法满足,否则它比尝试编写自己的信号、排队和出队代码要好得多。 - Ian Mercer
Monitor.Wait 调用时会释放锁定,如果线程被选中接收 Monitor.Pulse 信号,则重新获取锁定。 - Brian Gideon

2

设计本身没有问题。

当你不知道哪个线程会先完成其工作(生产者还是消费者),而使用 Monitor.Wait()Monitor.Pulse() 时,问题就会出现。在这种情况下,使用 AutoResetEvent 可以解决这个问题。想象一下消费者到达应该消费生产者生产的数据的部分时。也许它在生产者发出信号之前到达那里,那么一切都没问题,但如果消费者在生产者已经发出信号后到达那里呢?是的,那么你会遇到死锁,因为生产者已经为该部分调用了 Monitor.Pulse() ,并且不会重复执行。 使用 AutoResetEvent ,你可以确保消费者在那里等待来自生产者的信号,如果生产者在消费者甚至到达该部分之前已经发出信号,那么门是打开的,消费者将继续执行。

在中介者内部使用 Monitor.Wait()Monitor.Pulse() 来通知等待的线程是可以的。


1

死锁是否发生是因为Pulse没有存储任何状态?这意味着如果ProducerConsumer调用Wait之前/之后调用Pulse,那么Wait将会被阻塞。这是文档中关于Monitor.Pulse的注释。

此外,你应该知道object x = new object();是多余的 - 一个out调用将初始化x,所创建的对象将随着TryDequeue调用而超出范围。


感谢注意到我没有在 x 上使用 new。代码已编辑为正确版本 -> 对象 x; - Xaqron
也许在这种情况下使用“AutoResetEvent”更好,但它的性能是否足够好。代码应该高度优化。 - Xaqron
我认为AutoResetEvent在性能方面足够好了,它比仅仅将线程睡眠直到TryDequeue成功更好... - davisoa

1

根据提供的代码样本很难判断情况。

  • 锁是否在其他位置持有?在中介者内部?
  • 线程只是在获取锁时停留,而不是在实际的等待调用上停留吗?
  • 您是否已经在调试器中暂停了线程以查看当前状态?
  • 您是否尝试过简单的测试,例如将一个简单的单个值放入队列中并使其正常工作?或者Mediator此时相当复杂?

在Mediator类和生产者类中提供一些更详细的信息之前,这只是一些猜测。似乎某个线程可能在您不希望它持有锁时持有锁。一旦您发出脉冲信号,您确实需要通过退出“lock”范围中可能拥有它的任何线程来释放锁。因此,如果在中介者的某个地方您拥有该锁并调用Pulse,则需要退出持有锁的最外层范围,而不仅仅是Pulse中的那个范围。


生产者和消费者的Start()方法都进入了一个无限循环。当它们需要相互通信时,它们使用中介者上的方法。在调试器中一切正常,但似乎至少有一个线程在错误的时间到达某个地方。在这种情况下,使用“AutoResetEvent”是否更好,并且它的性能是否足够好? - Xaqron
我认为根本的问题仍然被忽略了。尽管AutoResetEvent可以暂时解决问题,但仍存在一些潜在问题。性能可能与设置本身无关,但是您的无限循环可能会导致锁的大量获取和释放不必要。根据您关于程序所做的所有陈述,Monitor.Wait()和Monitor.Pulse()应该足够了。由davisoa指出的MS文档提供了一个生产者/消费者示例,看起来足够简单易用。 - doobop
那篇来自 MSDN 的文章(http://msdn.microsoft.com/en-us/library/yy12yx1f%28VS.80%29.aspx)无法使用,你可以在底部找到 Brian Gideon 的最新评论。 - Xaqron
不确定那个链接是从哪里来的。我指的是 davisoa 列出的那个链接 (http://msdn.microsoft.com/en-us/library/system.threading.monitor.pulse.aspx)。 - doobop

1
你能重构成一个普通的生产者/消费者队列吗?这样就可以在一个单一的类中处理入队、出队和线程信号,因此不需要传递公共锁。通过委托可以处理出队过程。如果需要,我可以发布一个示例。

对象是复杂的,将它们全部放在一个类中会使该类难以理解和维护。 - Xaqron

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接