在.NET中创建一个阻塞的队列<T>?

171

我有一个场景,其中有多个线程向队列中添加内容,同时也有多个线程从同一队列中读取内容。如果队列达到特定大小,则所有正在填充队列的线程都将被阻塞,直到从队列中删除一个项目。

下面的解决方案是我现在正在使用的,我的问题是:如何改进这个方案?是否有一个对象已经在BCL中启用了此行为,我应该使用它?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}

9
.Net提供了内置类来帮助处理这种情况。这里列出的大部分答案已经过时,最新答案在底部。可以研究线程安全的阻塞式集合。虽然答案可能过时了,但这仍然是一个好问题! - Tom A
我认为即使在.NET中有新的并发类,学习Monitor.Wait/Pulse/PulseAll仍然是一个好主意。 - thewpfguy
1
同意@thewpfguy的观点。你需要理解后台的基本锁定机制。值得注意的是,Systems.Collections.Concurrent直到2010年4月才存在,并且仅在Visual Studio 2010及以上版本中存在。对于仍在使用VS2008的人来说,这绝对不是一个选择... - Vic Colborn
1
如果您正在阅读此内容,请查看 System.Threading.Channels,了解 .NET Core 和 .NET Standard 的多写入器/多读取器、有界、可选阻塞实现。 - Mark Rendle
10个回答

207

那看起来很不安全(同步很少);我们考虑使用这种方式:

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

实际上,你需要一种关闭队列的方法,以便读者开始干净地退出-可能像布尔标志这样的东西-如果设置了,空队列只会返回(而不是阻塞):

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}

3
为什么选择 SizeQueue,而不是 FixedSizeQueue? - mindless.panda
4
@Lasse - 在Wait期间,它释放锁以便其他线程可以获取它。当它唤醒时,会重新获取这个锁。 - Marc Gravell
1
很好,就像我说的,有些东西我没有理解 :) 这确实让我想重新审视一下我的线程代码... - Lasse V. Karlsen
1
@Lasse - 请注意底部的下拉菜单,其中包含17个主题:http://www.yoda.arachsys.com/csharp/threads/ - Marc Gravell
1
@Stan,这不是死锁;等待会将锁释放给列表中的下一个人,因此在某个时刻读者将获得它,并在该写入程序腾出空间后重新唤醒该写入程序。 - Marc Gravell
显示剩余19条评论

64

15

您可以在 System.Collections.Concurrent 命名空间中使用 BlockingCollectionConcurrentQueue

 public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
    /// <summary>
    /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
    /// </summary>
    public ProducerConsumerQueue()  
        : base(new ConcurrentQueue<T>())
    {
    }

  /// <summary>
  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
  /// </summary>
  /// <param name="maxSize"></param>
    public ProducerConsumerQueue(int maxSize)
        : base(new ConcurrentQueue<T>(), maxSize)
    {
    }



}

3
BlockingCollection 默认使用队列(Queue),因此我认为这并不必要。 - Curtis White
1
BlockingCollection是否像队列一样保留顺序? - joelc
1
是的,当使用ConcurrentQueue进行初始化时。 - Andreas

14
“如何改进这个程序?” 你需要查看类中的每个方法,并考虑如果另一个线程同时调用该方法或任何其他方法会发生什么。例如,您在Remove方法中放置了锁定,但没有在Add方法中放置锁定。如果一个线程添加操作同时进行另一个线程删除操作会怎样呢?“坏事情”会发生。 还要考虑一个方法可以返回第二个对象,该对象提供对第一个对象内部数据的访问,例如GetEnumerator。想象一下,一个线程正在遍历该枚举器,另一个线程同时修改该列表。“不好”的结果就是出现问题。 一个好的经验法则是通过将类中的方法数量减少到绝对最小来使此过程更容易实现。特别是,不要继承另一个容器类,因为您将公开该类的所有方法,为调用者提供了一种方法来破坏内部数据,或查看部分更改的数据(同样糟糕,因为数据在那一刻似乎已损坏)。隐藏所有细节,并且对允许访问它们的方式非常无情地进行控制。 我强烈建议您使用现成的解决方案-获取有关线程的书籍或使用第三方库。否则,鉴于您正在尝试的内容,您将花费很长时间调试代码。 此外,Remove方法返回一个项(例如,最先添加的项,因为它是队列),而不是调用者选择特定项,这是否更有意义呢?当队列为空时,也许Remove方法应该同时阻塞。 更新:Marc的答案实际上实现了所有这些建议! :) 但我会将其保留在此处,因为它可能有助于理解他的版本为什么是如此改进。

6

我刚使用响应式扩展编写了这个,并想起了这个问题:

public class BlockingQueue<T>
{
    private readonly Subject<T> _queue;
    private readonly IEnumerator<T> _enumerator;
    private readonly object _sync = new object();

    public BlockingQueue()
    {
        _queue = new Subject<T>();
        _enumerator = _queue.GetEnumerator();
    }

    public void Enqueue(T item)
    {
        lock (_sync)
        {
            _queue.OnNext(item);
        }
    }

    public T Dequeue()
    {
        _enumerator.MoveNext();
        return _enumerator.Current;
    }
}

虽然不一定完全安全,但非常简单。


Subject<t>是什么?我没有它的命名空间的解析器。 - theJerm
这是响应式扩展的一部分。 - Mark Rendle
这不是答案。它根本没有回答问题。 - makhdumi

5

我为一个线程安全的有界阻塞队列编写了以下代码:

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

public class BlockingBuffer<T>
{
    private Object t_lock;
    private Semaphore sema_NotEmpty;
    private Semaphore sema_NotFull;
    private T[] buf;

    private int getFromIndex;
    private int putToIndex;
    private int size;
    private int numItems;

    public BlockingBuffer(int Capacity)
    {
        if (Capacity <= 0)
            throw new ArgumentOutOfRangeException("Capacity must be larger than 0");

        t_lock = new Object();
        buf = new T[Capacity];
        sema_NotEmpty = new Semaphore(0, Capacity);
        sema_NotFull = new Semaphore(Capacity, Capacity);
        getFromIndex = 0;
        putToIndex = 0;
        size = Capacity;
        numItems = 0;
    }

    public void put(T item)
    {
        sema_NotFull.WaitOne();
        lock (t_lock)
        {
            while (numItems == size)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            buf[putToIndex++] = item;

            if (putToIndex == size)
                putToIndex = 0;

            numItems++;

            Monitor.Pulse(t_lock);

        }
        sema_NotEmpty.Release();


    }

    public T take()
    {
        T item;

        sema_NotEmpty.WaitOne();
        lock (t_lock)
        {

            while (numItems == 0)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            item = buf[getFromIndex++];

            if (getFromIndex == size)
                getFromIndex = 0;

            numItems--;

            Monitor.Pulse(t_lock);

        }
        sema_NotFull.Release();

        return item;
    }
}

你能提供一些代码示例来展示如何使用这个库排队一些线程函数吗?包括如何实例化这个类? - theJerm
这个问题/回答有点过时了。你应该查看System.Collections.Concurrent命名空间以获取阻塞队列支持。 - Kevin

2
我还没有完全探索TPL,但它们可能有适合你需要的东西,或者至少有一些反射器可供借鉴灵感。

希望对你有所帮助。

我知道这已经很老了,但我的评论是给SO的新手的,因为OP今天已经知道了这一点。 这不是一个答案,这应该是一个评论。 - John Demetriou

2

0

嗯,你可以看一下 System.Threading.Semaphore 类。除此之外 - 没有,你必须自己制作这个。据我所知,没有这样的内置集合。


我看了一下关于限制访问资源的线程数量的内容,但它不允许您基于某些条件(如Collection.Count)阻止对资源的所有访问。就我所知是这样的。 - Eric Schoonover
好的,这个部分你自己处理,就像现在一样。只是不再使用MaxSize和_FullEvent,而是使用Semaphore,在构造函数中初始化正确的计数。然后,在每次添加/移除操作时,调用WaitForOne()或Release()。 - Vilx-
它与您现在拥有的并没有太大的区别,只是在我看来更加简单。 - Vilx-
你能给我一个示例展示这个工作吗?我没有看到如何动态调整信号量的大小,而这种情况需要。因为只有当队列满时才必须能够阻止所有资源。 - Eric Schoonover
啊,改变大小!你为什么不立刻说呢?好的,那么信号量不适合你。祝你用这种方法好运! - Vilx-
我知道这已经很老了,但我的评论是给SO的新手们的,因为OP今天已经知道了这一点。 这不是一个答案,这应该是一个评论。 - John Demetriou

-1
如果您想要最大的吞吐量,允许多个读取器进行读取,只有一个写入器进行写入,BCL 有一种称为 ReaderWriterLockSlim 的东西,应该有助于简化您的代码...

如果队列已满,我希望任何人都无法写入。 - Eric Schoonover
所以你要和锁一起使用。以下是一些非常好的例子: http://www.albahari.com/threading/part2.aspx#_ProducerConsumerQWaitHandle http://www.albahari.com/threading/part4.aspx - DavidN
3
使用队列/出队操作,每个人都是一个写者……使用独占锁可能更加实用。 - Marc Gravell
我知道这已经很老了,但我的评论是给SO的新手的,因为OP今天已经知道了这一点。 这不是一个答案,这应该是一个评论。 - John Demetriou

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接