C#线程间通信

6

我希望有两个线程协作,一个是生产者,一个是消费者。 消费者比较慢,而生产者非常快速并会突发性地工作。

例如,消费者可以每20秒处理一条消息,而生产者可以在一秒内生产10条消息,但只会偶尔这样做,以便让消费者跟得上。

我希望实现如下功能:

Stream commonStream;
AutoResetEvent commonLock;

void Producer()
{
    while (true)
    {
        magic.BlockUntilMagicAvalible();
        byte[] buffer = magic.Produce();
        commonStream.Write(buffer);
        commonLock.Set();
    }
}

void Consumer()
{
    while(true)
    { 
        commonLock.WaitOne();
        MagicalObject o = binarySerializer.Deserialize(commonStream);
        DoSomething(o);
    }
}

你正在使用哪个版本的.Net?v4有一些新功能,可以完美解决这个问题。 - Scott Chamberlain
.Net 3.5; 注释长度必须至少为15个字符。 - AK_
4个回答

12

如果你使用的是 .Net 4.0 或更高版本,可以通过使用 BlockingCollection 来实现。

int maxBufferCap = 500;
BlockingCollection<MagicalObject> Collection 
                           = new BlockingCollection<MagicalObject>(maxBufferCap);
void Producer()
{
    while (magic.HasMoreMagic)
    {
        this.Collection.Add(magic.ProduceMagic());
    }
    this.Collection.CompleteAdding();
}

void Consumer()
{
    foreach (MagicalObject magicalObject in this.Collection.GetConsumingEnumerable())
    {
        DoSomthing(magicalObject);
    }
}
foreach 行会在缓冲区中没有数据时休眠,同时在集合中添加了新项后会自动唤醒。我设置最大缓冲区的原因是,如果生产者比消费者快得多,随着越来越多的对象放入集合中,您可能会消耗大量内存。通过在创建阻塞集合时设置最大缓冲区大小,当缓冲区大小达到时,生产者上的 Add 调用将被阻塞,直到消费者从集合中删除一个项为止。 BlockingCollection 类的另一个好处是它可以有任意数量的生产者和消费者,不需要是 1:1 的比例。如果 DoSomething 支持,您可以为每个计算机核心使用一个 foreach 循环(甚至使用 Parallel.ForEach 并将可消耗枚举作为数据源)。
void ConsumersInParalell()
{
    //This assumes the method signature of DoSomthing is one of the following:
    //    Action<MagicalObject>
    //    Action<MagicalObject, ParallelLoopState>
    //    Action<MagicalObject, ParallelLoopState, long>
    Paralell.ForEach(this.Collection.GetConsumingEnumerable(), DoSomthing);
}

2
请注意,TPL已经被移植到.NET 3.5:http://codeblog.theg2.net/2010/02/tpl-and-parallelforeach-in-net-35-using.html - Dan Bryant

1

我建议阅读以下的文章,它们描述了您的问题。基本上,您的工作单元没有获得正确的隔离。

链接 链接


0
你可以使用队列和计时器来实现你想要的功能。生产者将值添加到队列中并启动消费者计时器。消费者计时器的经过事件(在线程池线程上)停止计时器,并循环遍历队列直到它为空,然后消失(没有不必要的轮询)。生产者可以在消费者仍在运行时向队列中添加内容。
System.Timers.Timer consumerTimer;
Queue<byte[]> queue = new Queue<byte[]>();

void Producer()
{
    consumerTimer = new System.Timers.Timer(1000);
    consumerTimer.Elapsed += new System.Timers.ElapsedEventHandler(consumerTimer_Elapsed);
    while (true)
    {
        magic.BlockUntilMagicAvailable();
        lock (queue)
        {
            queue.Enqueue(magic.Produce());
            if (!consumerTimer.Enabled)
            {
                consumerTimer.Start();
            }
        }
    }
}

void consumerTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
    while (true)
    {
        consumerTimer.Stop();
        lock (queue)
        {
            if (queue.Count > 0)
            {
                DoSomething(queue.Dequeue());
            }
            else
            {
                break;
            }
        }
    }
}

它有什么不安全的地方?而且它不会轮询 - 定时器是一次性的,只有在生产者添加到队列时才会激活。 - Ed Power

-1

我使用互斥锁。其思想是两个线程同时运行。消费者线程由一个互斥锁锁定,并将无限期地等待生产者线程释放。它将并行处理数据,使生产者线程可以继续。当完成时,消费者将重新锁定。

(为简洁起见,省略了启动线程和其他优质部分的代码。)

// Pre-create mutex owned by Producer thread, then start Consumer thread.
Mutex mutex = new Mutex(true);  
Queue<T> queue = new Queue<T>();

void Producer_AddData(T data)
{
  lock (queue) {
    queue.Enqueue(GetData());
  }

  // Release mutex to start thread:
  mutex.ReleaseMutex();
  mutex.WaitOne();
}

void Consumer()
{
  while(true)
  { 
    // Wait indefinitely on mutex
    mutex.WaitOne();
    mutex.ReleaseMutex();

    T data;
    lock (queue) {
      data = queue.Dequeue();
    }
    DoSomething(data);
  }

}

这会使生产者减慢几毫秒,因为它在等待消费者唤醒并释放互斥锁。如果你可以接受这一点。


使用BlockingCollection要好得多。首先,与使用互斥量相比,它更容易确定正确性,而且与您的模型不同,生产者和消费者可以并行工作;您确保您的代码要么正在生产要么正在消费,但从不同时进行。与阻塞集合不同,它也无法很好地扩展到多个生产者或多个消费者,而在阻塞集合中这样做是微不足道的。您可以使用基于更复杂的互斥量的方法来获得阻塞集合的好处,但这将是大量的工作,并且可读性/可维护性会大大降低。 - Servy
这就是为什么这些解决方案有问题的部分原因,很难推理出正在发生的事情。你说得对,它们可以并行运行,但由于允许这种情况使得它不是线程安全的。在你入队新项目的同时也可能会移除队列中的一个项目,这可能导致各种问题的发生。 - Servy
我不想引发争论,但根据MSDN,BlockingCollection是在4.5中添加的:http://msdn.microsoft.com/en-us/library/dd267312.aspx。我发布的答案可以并行工作,对于那些需要此解决方案的人来说没有任何问题。如果您能删除您的负面评论和评分,我将不胜感激。--您是正确的,我确实需要LOCK {}队列以使其线程安全。但我在描述中已经说明了:我想展示机制如何工作,而不是构建一个可工作的应用程序。 - Ben
不,根据MSDN 它是在4.0中添加的。它在4.5中是可用的,但不仅限于4.5。这个解决方案并没有“没有问题”。由于对Queue进行不安全的并发访问,存在非常重要的竞态条件。 - Servy
1
没有理由吗?我已经发布了很多评论,解释了您的帖子存在的问题,直到看清楚您没有意图修复帖子中的问题之前,我才添加了负评。至于您的同步,对我来说这是非常重要的事情;它太容易出错了。举个例子,您没有正确地执行它,在生产者创建项目时进行锁定,从而防止消费者在处理期间获取项目。这甚至可能使消费者挨饿。 - Servy
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接