线程安全的FIFO队列/缓冲区

12

我需要实现一种任务缓冲区。基本要求如下:

  • 使用单个后台线程处理任务
  • 从多个线程接收任务
  • 处理所有接收到的任务,即确保在接收到停止信号后缓冲区中没有任何任务
  • 必须维护每个线程接收到的任务的顺序

我考虑使用类似下面的队列来实现。对该实现有什么建议?还有其他更好的实现方法吗?

public class TestBuffer
{
    private readonly object queueLock = new object();
    private Queue<Task> queue = new Queue<Task>();
    private bool running = false;

    public TestBuffer()
    {
    }

    public void start()
    {
        Thread t = new Thread(new ThreadStart(run));
        t.Start();
    }

    private void run()
    {
        running = true;

        bool run = true;
        while(run)
        {
            Task task = null;
            // Lock queue before doing anything
            lock (queueLock)
            {
                // If the queue is currently empty and it is still running
                // we need to wait until we're told something changed
                if (queue.Count == 0 && running)
                {
                    Monitor.Wait(queueLock);
                }

                // Check there is something in the queue
                // Note - there might not be anything in the queue if we were waiting for something to change and the queue was stopped
                if (queue.Count > 0)
                {
                    task = queue.Dequeue();
                }
            }

            // If something was dequeued, handle it
            if (task != null)
            {
                handle(task);
            }

            // Lock the queue again and check whether we need to run again
            // Note - Make sure we drain the queue even if we are told to stop before it is emtpy
            lock (queueLock)
            {
                run = queue.Count > 0 || running;
            }
        }
    }

    public void enqueue(Task toEnqueue)
    {
        lock (queueLock)
        {
            queue.Enqueue(toEnqueue);
            Monitor.PulseAll(queueLock);
        }
    }

    public void stop()
    {
        lock (queueLock)
        {
            running = false;
            Monitor.PulseAll(queueLock);
        }
    }

    public void handle(Task dequeued)
    {
        dequeued.execute();
    }
}

可能是Queues And Wait Handles in C#的重复问题。 - Hans Passant
你想在调用stop()之后允许入队吗? - bmm6o
5个回答

8
您可以使用开箱即用的BlockingCollection来处理此问题。 BlockingCollection旨在具有一个或多个生产者和一个或多个消费者。 在您的情况下,您将有多个生产者和一个消费者。当您收到停止信号时,请执行以下操作:
  • 向生产者线程发送停止信号
  • 在BlockingCollection实例上调用CompleteAdding
消费者线程将继续运行,直到删除并处理所有排队的项目,然后它将遇到BlockingCollection已完成的条件。当线程遇到该条件时,它会退出。

谢谢您的建议。不幸的是,我只能使用.Net 3.5,这就排除了BlockingCollection。很抱歉,我应该在要求中提到这一点;) - user1300560

7

您应该考虑使用FIFO的ConcurrentQueue。如果不适合,可以尝试使用线程安全集合中的其他相关类。使用这些类可以避免一些风险。


1

看看我的轻量级线程安全FIFO队列实现,它是一种非阻塞同步工具,使用线程池比大多数情况下创建自己的线程和使用锁和互斥量等阻塞同步工具更好。https://github.com/Gentlee/SerialQueue

用法:

var queue = new SerialQueue();
var result = await queue.Enqueue(() => /* code to synchronize */);

根据指南Enqueue方法应该被命名为EnqueueAsync - Theodor Zoulias
@TheodorZoulias 为什么 Task.Run 和 TaskFactory.StartNew 没有 async 呢?Task.ContinueWith 也是一样的情况。应该改成 RunAsync、StartNewAsync、ContinueWithAsync。 - Alexander Danilov
@TheodorZoulias 此外,实际的入队操作是同步进行的,因此在方法完成后,动作已经被入队。 - Alexander Danilov
如果“Enqueue”同步运行,则返回“Task”没有意义。这个“Task”应该代表什么?哪些异步完成? - Theodor Zoulias
内置方法 HttpClient.SendAsync 是同步还是异步的,为什么? - Theodor Zoulias
显示剩余8条评论

0

你可以在 .NET 3.5 上使用Rx。它可能从未退出RC版,但我相信它是稳定的,并被许多生产系统使用。如果你不需要Subject,你可能会发现.NET 3.5的基本元素(如并发集合)可以使用,这些元素直到4.0才与.NET Framework一起发布。

.net 3.5下Rx(反应式扩展)的替代方法

* - 挑剔者的角落:除了先进的时间窗口(超出范围),但缓冲区(按计数和时间)、排序和调度程序都是稳定的。


0

谢谢您的建议。不幸的是,我只能使用 .Net 3.5,所以这些都不符合要求。抱歉,我应该在需求中提到这一点 ;) - user1300560

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接