
关于生产者保持队列中项目数量不变,我从这个问题中找到了一个潜在的想法: 自动在新的入队时出队旧值的固定大小队列 我目前正在使用一个包装器类(基于那个答案)来封装一个ConcurrentQueue,并具有以下Enqueue()方法:
public class FixedSizeQueue<T>
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizeQueue(int size)
        Size = size;

    public void Enqueue(T obj)
        // add item to the queue

        lock (this) // lock queue so that queue.Count is reliable
            while (queue.Count > Size) // if queue count > max queue size, then dequeue an item
                T objOut;
                queue.TryDequeue(out objOut);


FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit


你为什么关心快照?同时添加项目有什么问题吗?如果你担心会抓取太多,那就在消费者中停止于大小。 - paparazzo

简单来说,ConcurrentQueue有一个 "ToArray" 方法,当输入时,将锁定集合并生成队列中所有当前项的 "快照"。如果您希望为消费者提供一组要处理的内容块,可以锁定与入队方法相同的对象,调用ToArray(),然后通过一个 while(!queue.IsEmpty) queue.TryDequeue(out trash) 循环清空队列,在返回您提取的数组之前。
这将是您的 GetAll() 方法:
public T[] GetAll()
    lock (syncObj) // so that we don't clear items we didn't get with ToArray()
        var result = queue.ToArray();
        T trash;
        while(!queue.IsEmpty) queue.TryDequeue(out trash);



  • NEVER use lock(this). You never know what other objects may be using your object as a locking focus, and thus would be blocked when the object locks itself from the inside. The best practice is to lock a privately scoped object instance, usually one created just to be locked: private readonly object syncObj = new object();

  • Since you're locking critical sections of your wrapper anyway, I would use an ordinary List<T> instead of a concurrent collection. Access is faster, it's more easily cleaned out, so you'll be able to do what you're doing much more simply than ConcurrentQueue allows. To enqueue, lock the sync object, Insert() before index zero, then remove any items from index Size to the list's current Count using RemoveRange(). To dequeue, lock the same sync object, call myList.ToArray() (from the Linq namespace; does pretty much the same thing as ConcurrentQueue's does) and then call myList.Clear() before returning the array. Couldn't be simpler:

    public class FixedSizeQueue<T>
    private readonly List<T> queue = new List<T>();
    private readonly object syncObj = new object();
    public int Size { get; private set; }
    public FixedSizeQueue(int size) { Size = size; }
    public void Enqueue(T obj)
        lock (syncObj)
            if(queue.Count > Size) 
               queue.RemoveRange(Size, Count-Size);
    public T[] Dequeue()
        lock (syncObj)
            var result = queue.ToArray();
            return result;
  • You seem to understand that you are throwing enqueued items away using this model. That's usually not a good thing, but I'm willing to give you the benefit of the doubt. However, I will say there is a lossless way to achieve this, using a BlockingCollection. A BlockingCollection wraps any IProducerConsumerCollection including most System.Collections.Concurrent classes, and allows you to specify a maximum capacity for the queue. The collection will then block any thread attempting to dequeue from an empty queue, or any thread attempting to add to a full queue, until items have been added or removed such that there is something to get or room to insert. This is the best way to implement a producer-consumer queue with a maximum size, or one that would otherwise require "polling" to see if there's something for the consumer to work on. If you go this route, only the ones the consumer has to throw away are thrown away; the consumer will see all the rows the producer puts in and makes its own decision about each.

嗨,Keith。非常感谢您的出色答案。我从未考虑过使用List,可能是因为存储项目的方式似乎对我来说有些“反向”,但这是一个很好的解决方案。我现在已经实施了它,而且它运行得非常好。关于放弃排队的项目,我同意这肯定不是你所期望的,但这是我得到的要求(实际上有一个原因,但我不能详细说明)。再次感谢您出色的答案。 - bmt22033




建议您需要以某种方式“等待”或“通知”消费者有关该项目的可用性。在这种情况下,考虑使用 BlockingCollection<T> 代替。

他正在锁定关键部分,因为他正在同时执行多个操作;ConcurrentQueue将锁定集合,执行添加或删除一个元素的操作,然后释放锁。这意味着如果生产者正在尝试添加而消费者正在清除元素,那么不必要的东西将会丢失。尽管如此,我也不同意他的模式。 - KeithS

