具有固定大小FIFO队列的生产者/消费者模式

6
我需要在一个固定大小的FIFO队列中实现生产者/消费者模式。我认为在ConcurrentQueue周围包装一个类可能会起作用,但我不完全确定(而且我以前从未使用过ConcurrentQueue)。这其中的难点是队列只需要容纳固定数量的项(在我的情况下是字符串)。我的应用程序将有一个生产者任务/线程和一个消费者任务/线程。当我的消费者任务运行时,它需要出队此时队列中存在的所有项并对其进行处理。
值得一提的是,由我的消费者对队列中排队的项目进行处理仅仅是通过SOAP将它们上传到一个不完全可靠的Web应用程序。如果无法建立连接或SOAP调用失败,则应该放弃这些项目并返回队列以获取更多。由于SOAP的开销,我试图最大化可以在一个SOAP调用中发送的队列项目数。
有时,我的生产者可能会比我的消费者更快地添加项目,并且处理它们。如果队列已经满了,而我的生产者需要添加另一个项目,我需要将新项目入队,但然后出队最旧的项目,以使队列的大小保持不变。基本上,我需要始终保留在队列中生成的最新项目(即使这意味着某些项目没有被消耗,因为我的消费者目前正在处理先前的项目)。
关于生产者保持队列中项目数量不变,我从这个问题中找到了一个潜在的想法: 自动在新的入队时出队旧值的固定大小队列 我目前正在使用一个包装器类(基于那个答案)来封装一个ConcurrentQueue,并具有以下Enqueue()方法:
public class FixedSizeQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizeQueue(int size)
    {
        Size = size;
    }

    public void Enqueue(T obj)
    {
        // add item to the queue
        queue.Enqueue(obj);

        lock (this) // lock queue so that queue.Count is reliable
        {
            while (queue.Count > Size) // if queue count > max queue size, then dequeue an item
            {
                T objOut;
                queue.TryDequeue(out objOut);
            }
        }
    }
}

我创建了一个该类的实例,并设置了队列大小限制,代码如下:

FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit

我启动了生产者任务,它开始填充队列。我的Enqueue()方法的代码似乎在添加项目导致队列计数超过最大大小时正确地删除了最旧的项目。现在我需要我的消费者任务出队并处理项目,但这就是我的大脑混乱的地方。实现一个Dequeue方法的最佳方式是什么,以便消费者可以在某个时间点对队列进行快照,并出队所有项目进行处理(在此过程中,生产者可能仍在向队列添加项目)?

你为什么关心快照?同时添加项目有什么问题吗?如果你担心会抓取太多,那就在消费者中停止于大小。 - paparazzo
2个回答

7
简单来说,ConcurrentQueue有一个 "ToArray" 方法,当输入时,将锁定集合并生成队列中所有当前项的 "快照"。如果您希望为消费者提供一组要处理的内容块,可以锁定与入队方法相同的对象,调用ToArray(),然后通过一个 while(!queue.IsEmpty) queue.TryDequeue(out trash) 循环清空队列,在返回您提取的数组之前。
这将是您的 GetAll() 方法:
public T[] GetAll()
{
    lock (syncObj) // so that we don't clear items we didn't get with ToArray()
    {
        var result = queue.ToArray();
        T trash;
        while(!queue.IsEmpty) queue.TryDequeue(out trash);
    }
}

既然您需要清除队列,您可以将两个操作简单地合并;创建一个适当大小的数组(使用queue.Count),然后在队列不为空的情况下,出队一个项目并将其放入数组中,最后返回数组。

现在,这是针对特定问题的答案。现在我必须良心地戴上我的CodeReview.SE帽子,并指出几件事:

  • NEVER use lock(this). You never know what other objects may be using your object as a locking focus, and thus would be blocked when the object locks itself from the inside. The best practice is to lock a privately scoped object instance, usually one created just to be locked: private readonly object syncObj = new object();

  • Since you're locking critical sections of your wrapper anyway, I would use an ordinary List<T> instead of a concurrent collection. Access is faster, it's more easily cleaned out, so you'll be able to do what you're doing much more simply than ConcurrentQueue allows. To enqueue, lock the sync object, Insert() before index zero, then remove any items from index Size to the list's current Count using RemoveRange(). To dequeue, lock the same sync object, call myList.ToArray() (from the Linq namespace; does pretty much the same thing as ConcurrentQueue's does) and then call myList.Clear() before returning the array. Couldn't be simpler:

    public class FixedSizeQueue<T>
    {
    private readonly List<T> queue = new List<T>();
    private readonly object syncObj = new object();
    
    public int Size { get; private set; }
    
    public FixedSizeQueue(int size) { Size = size; }
    
    public void Enqueue(T obj)
    {
        lock (syncObj)
        {
            queue.Insert(0,obj)
            if(queue.Count > Size) 
               queue.RemoveRange(Size, Count-Size);
        }
    }
    
    public T[] Dequeue()
    {
        lock (syncObj)
        {
            var result = queue.ToArray();
            queue.Clear();
            return result;
        }
    }
    }
    
  • You seem to understand that you are throwing enqueued items away using this model. That's usually not a good thing, but I'm willing to give you the benefit of the doubt. However, I will say there is a lossless way to achieve this, using a BlockingCollection. A BlockingCollection wraps any IProducerConsumerCollection including most System.Collections.Concurrent classes, and allows you to specify a maximum capacity for the queue. The collection will then block any thread attempting to dequeue from an empty queue, or any thread attempting to add to a full queue, until items have been added or removed such that there is something to get or room to insert. This is the best way to implement a producer-consumer queue with a maximum size, or one that would otherwise require "polling" to see if there's something for the consumer to work on. If you go this route, only the ones the consumer has to throw away are thrown away; the consumer will see all the rows the producer puts in and makes its own decision about each.


嗨,Keith。非常感谢您的出色答案。我从未考虑过使用List,可能是因为存储项目的方式似乎对我来说有些“反向”,但这是一个很好的解决方案。我现在已经实施了它,而且它运行得非常好。关于放弃排队的项目,我同意这肯定不是你所期望的,但这是我得到的要求(实际上有一个原因,但我不能详细说明)。再次感谢您出色的答案。 - bmt22033

5

不要在this上使用lock。更多详情请参考为什么lock(this)是错误的?

这段代码

// if queue count > max queue size, then dequeue an item
while (queue.Count > Size) 
{
    T objOut;
    queue.TryDequeue(out objOut);
}

建议您需要以某种方式“等待”或“通知”消费者有关该项目的可用性。在这种情况下,考虑使用 BlockingCollection<T> 代替。


他正在锁定关键部分,因为他正在同时执行多个操作;ConcurrentQueue将锁定集合,执行添加或删除一个元素的操作,然后释放锁。这意味着如果生产者正在尝试添加而消费者正在清除元素,那么不必要的东西将会丢失。尽管如此,我也不同意他的模式。 - KeithS

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接