使用BlockingCollection和Tasks的经典生产者消费者模式 .NET 4 TPL

20
请看以下伪代码。
//Single or multiple Producers produce using below method
    void Produce(object itemToQueue)
    {
        concurrentQueue.enqueue(itemToQueue);
        consumerSignal.set;
    }

    //somewhere else we have started a consumer like this
    //we have only one consumer
    void StartConsumer()
    {
        while (!concurrentQueue.IsEmpty())
        {
            if (concurrentQueue.TrydeQueue(out item))
            {
                //long running processing of item
            }
        }
        consumerSignal.WaitOne();
    }
我如何将我从古至今所使用的模式移植到使用taskfactory创建的任务和net 4的新信号特性上?换句话说,如果有人要使用net 4编写这个模式,它会是什么样子?伪代码可以。正如您所看到的,我已经在使用.net 4 concurrentQueue。如何使用任务并可能使用一些新的信号机制(如果可行)?谢谢。
以下是Jon/Dan提供的解决方案。不用手动信号,也不需要while(true)或while(itemstoProcess)类型的循环,简单易懂。
//Single or multiple Producers produce using below method
 void Produce(object itemToQueue)
 {
     blockingCollection.add(item);
 }

 //somewhere else we have started a consumer like this
 //this supports multiple consumers !
 task(StartConsuming()).Start; 

 void StartConsuming()
 {
     foreach (object item in blockingCollection.GetConsumingEnumerable())
     {
                //long running processing of item
     }
 }

cancellations are handled using cancel tokens

这里有一个非常好的(因为它是逐步讲解的)说明和示例,请点击链接:http://www.codethinked.com/blockingcollection-and-iproducerconsumercollection。 - MajesticRa
嗨Gullu,请看一下这段代码。它是一个简单的工作示例,演示如何使用BlockingCollection<T>实现生产者-消费者模式。此处有代码。 - sɐunıɔןɐqɐp
3个回答

26
您可以使用 BlockingCollection<T>。文档中有一个示例。
该类专门设计用于使这个问题变得微不足道。

4
Jon 给出了生产者消费者问题的 .Net 4 的标准解决方案。根据 MSDN:「为实现 IProducerConsumerCollection<T> 接口的线程安全集合提供阻塞和边界功能。」 - user7116
1
阻塞集合在消费者没有轮询BlockingCollection.IsCompleted/IsAddingCompleted时不会向消费者发出信号。在经典模式中,生产者将项目添加到队列中,通知消费者并完成操作。使用阻塞集合,我们可以将集合标记为已完成添加,这将使其处于一种状态,在此状态下,直到消费者将所有项目出列之前,无法再添加更多项目。 - Gullu
@JonSkeet - 我正在尝试实现简单的BlockingCollection用法,但不想在Task内部使用while(true)和TryTake()。我本来想使用GetConsumingEnumerable(),但这让我很困扰...根据https://msdn.microsoft.com/en-us/library/dd460684%28v=vs.110%29.aspx的说法,“不能保证生产者线程添加的项目以相同的顺序枚举。”既然如此,拥有ConcurrentQUEUE有什么意义呢?我需要按顺序处理项目。我错过了什么吗? - Dave Black
@Dave:我怀疑你忽略了ConcurrentQueue并不是唯一的并发集合。我认为使用ConcurrentQueue是没问题的。 - Jon Skeet
@Jon Skeet 我知道还有其他的并发集合。我本来期望MSDN文档在队列方面做出这种警告。 - Dave Black
显示剩余6条评论

11

你的第二段代码看起来更好。但是,启动一个Task,然后立即等待它是毫无意义的。只需调用Take,然后直接在消费线程上处理返回的项目。这就是生产者-消费者模式应该使用的方式。如果你认为工作项的处理足够密集,需要更多的消费者,那么请启动更多的消费者。BlockingCollection支持多个生产者和多个消费者。

public class YourCode
{
  private BlockingCollection<object> queue = new BlockingCollection<object>();

  public YourCode()
  {
    var thread = new Thread(StartConsuming);
    thread.IsBackground = true;
    thread.Start();
  }

  public void Produce(object item)
  {
    queue.Add(item);
  }

  private void StartConsuming()
  {
    while (true)
    {
      object item = queue.Take();
      // Add your code to process the item here.
      // Do not start another task or thread. 
    }
  }
}

为什么在StartConsuming中我们不应该启动另一个任务或线程? - Lokeshwer
@Lokeshwer:仅仅这样是可以的。不好的是启动另一个任务,然后等待它完成。那样是毫无意义的。 - Brian Gideon
优秀的例子;我可以建议添加使用取消令牌以实现完整性。 - joelc

1
我之前使用过一种模式,可以创建一种“按需”队列消费者(基于从ConcurrentQueue中消费):
        private void FireAndForget(Action fire)
        {
            _firedEvents.Enqueue(fire);
            lock (_taskLock)
            {
                if (_launcherTask == null)
                {
                    _launcherTask = new Task(LaunchEvents);
                    _launcherTask.ContinueWith(EventsComplete);
                    _launcherTask.Start();
                }
            }
        }

        private void LaunchEvents()
        {
            Action nextEvent;

            while (_firedEvents.TryDequeue(out nextEvent))
            {
                if (_synchronized)
                {
                    var syncEvent = nextEvent;
                    _mediator._syncContext.Send(state => syncEvent(), null);
                }
                else
                {
                    nextEvent();                        
                }

                lock (_taskLock)
                {
                    if (_firedEvents.Count == 0)
                    {
                        _launcherTask = null;
                        break;
                    }
                }
            }
        }

        private void EventsComplete(Task task)
        {
            if (task.IsFaulted && task.Exception != null)
            {
                 // Do something with task Exception here
            }
        }

谢谢。但我希望Net 4新的TPL PFX能够使我发布的经典模式更易于维护/理解。你的方法对我来说有些过度了。 - Gullu
1
如果您的主要目标是易用性,Jon的答案是最好的选择;BlockingCollection专门为这种非常常见的模式设计而成(使用简单的阻塞调用Take和内置支持新的Cancellation系统)。 - Dan Bryant

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接