使用BlockingCollection<T>作为单生产者、单消费者FIFO队列是否好?

7
我需要单生产者、单消费者的FIFO查询,因为:
  • 我需要按接收顺序处理信息。
  • 我需要异步进行处理,因为调用者不应该在我处理消息时等待。
  • 下一条消息的处理应该仅在前一条消息的处理完成后开始。有时,“接收”消息的频率高于“处理”消息的频率。但是平均而言,我应该能够处理所有消息,只是有时候我必须“排队”它们。
所以这很像TCP/IP,你有一个生产者和一个消费者,有时候你可以比你处理的速度更快地接收到消息,所以你必须查询它们。其中排序很重要,而调用者完全不关心你对那些东西做了什么。
这听起来很简单,我可能可以使用通用的Queue来实现,但我想使用BlockingCollection,因为我不想写任何涉及ManualResetEvent等内容的代码。 BlockingCollection是否适合我的任务?您还能否提供其他建议?
2个回答

12
The BlockingCollection类实现了IProducerConsumerCollection接口,非常适合您的要求。
您可以创建两个任务,一个用作异步生产者,另一个作为消费者工作程序。前者将项目添加到BlockingCollection中,后者只需按FIFO顺序在新项目可用时进行消耗。
使用TPL任务BlockingCollection实现的生产者-消费者示例应用程序:
class ProducerConsumer
{
    private static BlockingCollection<string> queue = new BlockingCollection<string>();

    static void Main(string[] args)
    {
        Start();
    }

    public static void Start()
    {
        var producerWorker = Task.Factory.StartNew(() => RunProducer());
        var consumerWorker = Task.Factory.StartNew(() => RunConsumer());

        Task.WaitAll(producerWorker, consumerWorker);
    }

    private static void RunProducer()
    {
        int itemsCount = 100;

        while (itemsCount-- > 0)
        {
            queue.Add(itemsCount + " - " + Guid.NewGuid().ToString());
            Thread.Sleep(250);
        }
    }

    private static void RunConsumer()
    {
        foreach (var item in queue.GetConsumingEnumerable())
        {
           Console.WriteLine(DateTime.Now.ToString("HH:mm:ss.ffff") + " | " + item);
        }
    }
}

IProducerConsumerCollection:

定义了操作线程安全集合的方法,用于生产者/消费者使用。该接口为生产者/消费者集合提供了统一的表示形式,以便更高级别的抽象(如System.Collections.Concurrent.BlockingCollection(Of T))可以将该集合用作底层存储机制。


我希望在接下来的版本中,BlockingCollection 的默认FIFO实现不会被更改,但这是另一个问题... - Oleg Vazhnev
只要您引用了IProducerConsumerCollection接口,我相信您应该不会有问题。BCL类的实现不会在这种原则性的程度上更改FIFO顺序等内容。 - sll
2
我看到Start()方法等待这两个任务完成。显然,producerWorker任务会结束,但consumerWorker永远不会结束。这应该考虑在内吗? - Rich Shealer

0

既然你需要一个队列,为什么不坚持使用队列呢?你可以使用同步队列


Martin正确,MSDN:遍历集合本质上不是线程安全的过程。即使集合已同步,其他线程仍然可以修改集合,这会导致枚举器抛出异常。为保证枚举期间的线程安全性,您可以在整个枚举期间锁定集合或捕获其他线程所做更改引起的异常。 - sll

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接