C#生产者/消费者

32

我最近接触了一个C#生产者/消费者模式的实现,它非常简单和优雅(至少对我来说是这样)。

这个实现似乎是在2006年左右设计的,所以我想知道这个实现是否:
- 安全
- 仍然适用

以下是代码(原始代码参考自http://bytes.com/topic/net/answers/575276-producer-consumer#post2251375):

using System;  
using System.Collections;  
using System.Threading;

public class Test
{  
    static ProducerConsumer queue;

    static void Main()
    {
        queue = new ProducerConsumer();
        new Thread(new ThreadStart(ConsumerJob)).Start();

        Random rng = new Random(0);
        for (int i=0; i < 10; i++)
        {
            Console.WriteLine ("Producing {0}", i);
            queue.Produce(i);
            Thread.Sleep(rng.Next(1000));
        }
    }

    static void ConsumerJob()
    {
        // Make sure we get a different random seed from the
        // first thread
        Random rng = new Random(1);
        // We happen to know we've only got 10 
        // items to receive
        for (int i=0; i < 10; i++)
        {
            object o = queue.Consume();
            Console.WriteLine ("\t\t\t\tConsuming {0}", o);
            Thread.Sleep(rng.Next(1000));
        }
    }
}

public class ProducerConsumer
{
    readonly object listLock = new object();
    Queue queue = new Queue();

    public void Produce(object o)
    {
        lock (listLock)
        {
            queue.Enqueue(o);

            // We always need to pulse, even if the queue wasn't
            // empty before. Otherwise, if we add several items
            // in quick succession, we may only pulse once, waking
            // a single thread up, even if there are multiple threads
            // waiting for items.            
            Monitor.Pulse(listLock);
        }
    }

    public object Consume()
    {
        lock (listLock)
        {
            // If the queue is empty, wait for an item to be added
            // Note that this is a while loop, as we may be pulsed
            // but not wake up before another thread has come in and
            // consumed the newly added object. In that case, we'll
            // have to wait for another pulse.
            while (queue.Count==0)
            {
                // This releases listLock, only reacquiring it
                // after being woken up by a call to Pulse
                Monitor.Wait(listLock);
            }
            return queue.Dequeue();
        }
    }
}

你打算用哪个版本的.NET? - James Black
2
嗨lboregard,这个链接中有一个使用BlockingCollection<T>实现简单的生产者-消费者模式的工作示例。 - sɐunıɔןɐqɐp
7个回答

40

这段代码比.NET 2.0发布的时间还要早,是我写的。不过生产者/消费者队列的概念比这个要古老得多 :)

据我所知,这段代码是安全的,但它存在一些缺陷:

  • 它是非泛型的。现代版本肯定会是泛型的。
  • 它没有停止队列的方法。停止队列的一种简单方法(以便所有消费者线程都退役)是有一个“停止工作”标记可以放入队列中。您然后添加与线程数相同数量的标记。或者,您可以有一个单独的标志来指示您想要停止。(这允许其他线程在完成队列中的所有当前工作之前停止。)
  • 如果作业非常小,则逐个消耗作业可能并不是最有效的。

说实话,这段代码背后的思想比代码本身更重要。


脉冲并不一定会唤醒消费者,我知道这不太可能,但理论上只有生产者可以无限运行。此外,我已经测量过,监视器等待/脉冲与事件等待句柄相比没有性能优势。 - TakeMeAsAGuest
1
@TakeMeAsAGuest: 不太确定你说的第一部分是什么意思 - 你是否说过你见过有线程在等待监视器时,但脉冲没起到作用?至于性能 - 有很多不同的情况需要考虑(硬件、软件、等待线程的数量等)。我会看看能否在Joe Duffy的书中找到一些参考资料... - Jon Skeet

32
您可以尝试使用以下代码片段。它是通用的,并且有一种方法可以将null(或任何您想使用的标志)加入队列,以告诉工作线程退出。
该代码摘自此处:http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ConsoleApplication1
{

    public class TaskQueue<T> : IDisposable where T : class
    {
        object locker = new object();
        Thread[] workers;
        Queue<T> taskQ = new Queue<T>();

        public TaskQueue(int workerCount)
        {
            workers = new Thread[workerCount];

            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (workers[i] = new Thread(Consume)).Start();
        }

        public void Dispose()
        {
            // Enqueue one null task per worker to make each exit.
            foreach (Thread worker in workers) EnqueueTask(null);
            foreach (Thread worker in workers) worker.Join();
        }

        public void EnqueueTask(T task)
        {
            lock (locker)
            {
                taskQ.Enqueue(task);
                Monitor.PulseAll(locker);
            }
        }

        void Consume()
        {
            while (true)
            {
                T task;
                lock (locker)
                {
                    while (taskQ.Count == 0) Monitor.Wait(locker);
                    task = taskQ.Dequeue();
                }
                if (task == null) return;         // This signals our exit
                Console.Write(task);
                Thread.Sleep(1000);              // Simulate time-consuming task
            }
        }
    }
}

3
这是迄今为止最好的生产者-消费者模式实现。我最近在我的多线程应用程序中使用它,即使在1000-1500个线程下也能运行顺畅。 - Sumit Ghosh
我确定我在这里漏掉了什么(我的C#技能非常生疏),但是这个示例没有调用已消耗的task上的方法(与消费者中存储和调用Action委托的非泛型代码不同)。那么,如果它不能演示对已消耗任务的方法调用,那么泛型的意义是什么?(试图理解此示例与引用实现之间的区别。) - Inactivist
2
@Inactivist 你可以在构造函数中添加一个 Action<T>,将其保存为 _action 字段,并在 Dequeue() 后立即调用它,代码如下:_action(task); - Marcus
1
是的,正如@Inactivist所说,只需在构造函数中传递一个Action<T>,并将出队的对象传递给它(在锁定之外)。抱歉我错过了这一点。顺便说一句,现在所有这些都有点冗余了,因为使用blockingcollections / concurrentqueues和parallel libraries甚至是RX更容易! - dashton
1
你能否根据Marcus提到的正确代码更新你的答案? - FaNIX

26

早些时候,我从上面这段代码和来自文章系列中学到了Monitor.Wait/Pulse的工作原理(以及有关线程的许多内容)。正如Jon所说,它具有很大的价值,而且确实安全可行。

然而,在.NET 4中,框架中有一种生产者-消费者队列实现。直到最近我才发现它,但在此之前它已经做到了我需要的一切。


2
现在有一个更现代的选项,使用命名空间 System.Threading.Tasks.Dataflow。它是异步/等待友好的,并且更加通用。
更多信息请查看如何实现生产者-消费者数据流模式
它从 .Net Core 开始包含,对于旧版本的 .Net,您可能需要安装与命名空间相同名称的软件包。
我知道这个问题很老,但它是我请求的谷歌搜索结果中的第一个匹配项,所以我决定更新主题。

1

在C#中实现生产者/消费者模式的一种现代且简单的方法是使用System.Threading.Channels。它是异步的,使用ValueTask来减少内存分配。以下是一个示例:

public class ProducerConsumer<T>
{
    protected readonly Channel<T> JobChannel = Channel.CreateUnbounded<T>();

    public IAsyncEnumerable<T> GetAllAsync()
    {
        return JobChannel.Reader.ReadAllAsync();
    }

    public async ValueTask AddAsync(T job)
    {
        await JobChannel.Writer.WriteAsync(job);
    }

    public async ValueTask AddAsync(IEnumerable<T> jobs)
    {
        foreach (var job in jobs)
        {
            await JobChannel.Writer.WriteAsync(job);
        }
    }
}

0
public class ProducerConsumerProblem
    {
        private int n;
        object obj = new object();
        public ProducerConsumerProblem(int n)
        {
            this.n = n;
        }

        public void Producer()
        {

            for (int i = 0; i < n; i++)
            {
                lock (obj)
                {
                    Console.Write("Producer =>");
                    System.Threading.Monitor.Pulse(obj);
                    System.Threading.Thread.Sleep(1);
                    System.Threading.Monitor.Wait(obj);
                }
            }
        }

        public void Consumer()
        {
            lock (obj)
            {
                for (int i = 0; i < n; i++)
                {
                    System.Threading.Monitor.Wait(obj, 10);
                    Console.Write("<= Consumer");
                    System.Threading.Monitor.Pulse(obj);
                    Console.WriteLine();
                }
            }
        }
    }

    public class Program
    {
        static void Main(string[] args)
        {
            ProducerConsumerProblem f = new ProducerConsumerProblem(10);
            System.Threading.Thread t1 = new System.Threading.Thread(() => f.Producer());
            System.Threading.Thread t2 = new System.Threading.Thread(() => f.Consumer());
            t1.IsBackground = true;
            t2.IsBackground = true;
            t1.Start();
            t2.Start();
            Console.ReadLine();
        }
    }

output

Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer
Producer =><= Consumer

0

警告:如果您阅读评论,您会发现我的答案是错误的 :)

您的代码中可能存在死锁

想象以下情况,为了清晰起见,我使用了单线程方法,但应该很容易转换为多线程方法并加入睡眠:

// We create some actions...
object locker = new object();

Action action1 = () => {
    lock (locker)
    {
        System.Threading.Monitor.Wait(locker);
        Console.WriteLine("This is action1");
    }
};

Action action2 = () => {
    lock (locker)
    {
        System.Threading.Monitor.Wait(locker);
        Console.WriteLine("This is action2");
    }
};

// ... (stuff happens, etc.)

// Imagine both actions were running
// and there's 0 items in the queue

// And now the producer kicks in...
lock (locker)
{
    // This would add a job to the queue

    Console.WriteLine("Pulse now!");
    System.Threading.Monitor.Pulse(locker);
}

// ... (more stuff)
// and the actions finish now!

Console.WriteLine("Consume action!");
action1(); // Oops... they're locked...
action2();

如果这个不太清楚,请告诉我。

如果确认了,那么你的问题的答案是:“不,不安全”;) 希望这可以帮到你。


我没有看到原帖作者的代码中存在任何死锁问题,因为项目可以在每个消费者都在锁外的时候添加到队列中(在这种情况下,消费者下次获取锁定时会注意到队列不为空,因此它不必等待)或者等待脉冲(在这种情况下,脉冲保证会唤醒一个消费者)。 - supercat
我必须承认,我大部分都忘记了我当时的想法细节。回过头来看,我相信问题出在 while 循环上。它会一直锁定消费者线程,直到队列中有东西,这会阻止生产者锁定并入队。这有意义吗? - DiogoNeves
1
Monitor.Wait() 方法会释放它等待的锁,直到其他线程发出脉冲信号。消费者线程将在 Consume 方法中被阻塞,但这不会阻止生产者提供数据。最大的危险是,如果生产者在生成消费者期望的所有数据之前退出,消费者将永远等待永远不会到达的东西。可以通过设置一个 AllDone 标志来解决这个问题。 - supercat
1
如果最后一个生产者设置了 AllDone 并脉冲监视器,而 Consume 方法在其 while 循环条件中检查 AllDone,并且在看到 AllDone 脉冲监视器后退出(通过返回空或抛出异常),那么即使有多个消费者(它们将以任意顺序处理队列项)等待,所有等待的消费者都会被唤醒并告知退出。 - supercat

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接