多个生产者,单个消费者

15

我需要开发一个多线程应用程序,其中将有多个线程,每个线程都会生成自定义事件日志,需要将其保存在队列中(不是Microsoft MSMQ)。

还将有另一个线程从队列中读取日志数据并操作它,将某些信息保存到文件中。基本上,我们正在实现多生产者、单消费者的范例。

请问有人能提供如何在C++或C#中实现这一点的建议吗?

谢谢,

3个回答

17

使用定义在System.Collections.Concurrent中的BlockingCollection<T>可以很容易地完成此类操作。

基本上,您需要创建一个队列,以便所有线程都可以访问它:

BlockingCollection<LogRecord> LogQueue = new BlockingCollection<LogRecord>();
每个生产者都向队列中添加项目:
while (!Shutdown)
{
    LogRecord rec = CreateLogRecord(); // however that's done
    LogQueue.Add(rec);
}

而消费者做了类似的事情:

while (!Shutdown)
{
    LogRecord rec = LogQueue.Take();
    // process the record
}

默认情况下,BlockingCollection 使用 ConcurrentQueue<T> 作为后备存储。 ConcurrentQueue 处理线程同步,而且当尝试取出一个项目时,BlockingCollection 进行非忙等待。也就是说,如果消费者在队列中没有项目的情况下调用 Take 方法,它会进行非忙等待(没有睡眠/自旋),直到有项目可用。


2
你可以让生产者调用 CompleteAdding 方法,这会标记集合为添加完成状态(也就是说,不能再添加更多的元素)。消费者随后就可以使用 while (LogQueue.TryTake(out rec, Timeout.Infinite)) 语句,这意味着它会清空集合然后退出。当队列为空且集合已经标记为添加完成状态时,TryTake 方法会返回 False - Jim Mischel
@JimMischel 在消费者出队一个项目后,执行一些处理,如何将结果返回给调用进程? - gdp
@gdp:当线程完成某些工作时,有许多方法可以进行通知。如何进行取决于您所说的“调用进程”的含义。如果您有具体问题,请发表一个问题。 - Jim Mischel
@JimMischel 是的,你说得对。除此之外,你还需要在某个地方调用 CompleteAdding(),并且还要将 Take() 包围在一个 try-catch 结构中,处理 IOE,并且当它发生时什么也不做,只是返回。 - deostroll
1
@yBother是的,你可以使用多个Task.Run()调用。或者任何允许你启动线程的东西。你选择什么取决于你在程序的其余部分中使用了什么。 - Jim Mischel
显示剩余6条评论

2

2
你所计划的是一个经典的生产者消费者队列,其中一个线程消费队列中的项目来执行一些工作。这可以包装成一个更高级别的构造,称为“演员”或“活动对象”。
基本上,它将队列和消费项目的线程封装到一个单独的类中,其他线程在此类上异步调用所有方法,将消息放入队列中,由演员线程执行。在你的情况下,该类可以有一个名为writeData的单个方法,该方法将数据存储在队列中,并触发条件变量以通知演员线程队列中有内容。演员线程查看队列是否有任何数据,如果没有则等待条件变量。
这是一个关于该概念的好文章:http://www.drdobbs.com/go-parallel/article/showArticle.jhtml;jsessionid=UTEXJOTLP0YDNQE1GHPSKH4ATMY32JVN?articleID=225700095

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接