值得一提的是,由我的消费者对队列中排队的项目进行处理仅仅是通过SOAP将它们上传到一个不完全可靠的Web应用程序。如果无法建立连接或SOAP调用失败,则应该放弃这些项目并返回队列以获取更多。由于SOAP的开销,我试图最大化可以在一个SOAP调用中发送的队列项目数。
有时,我的生产者可能会比我的消费者更快地添加项目,并且处理它们。如果队列已经满了,而我的生产者需要添加另一个项目,我需要将新项目入队,但然后出队最旧的项目,以使队列的大小保持不变。基本上,我需要始终保留在队列中生成的最新项目(即使这意味着某些项目没有被消耗,因为我的消费者目前正在处理先前的项目)。
关于生产者保持队列中项目数量不变,我从这个问题中找到了一个潜在的想法: 自动在新的入队时出队旧值的固定大小队列 我目前正在使用一个包装器类(基于那个答案)来封装一个ConcurrentQueue,并具有以下Enqueue()方法:
public class FixedSizeQueue<T>
{
readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
public int Size { get; private set; }
public FixedSizeQueue(int size)
{
Size = size;
}
public void Enqueue(T obj)
{
// add item to the queue
queue.Enqueue(obj);
lock (this) // lock queue so that queue.Count is reliable
{
while (queue.Count > Size) // if queue count > max queue size, then dequeue an item
{
T objOut;
queue.TryDequeue(out objOut);
}
}
}
}
我创建了一个该类的实例,并设置了队列大小限制,代码如下:
FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit
我启动了生产者任务,它开始填充队列。我的Enqueue()方法的代码似乎在添加项目导致队列计数超过最大大小时正确地删除了最旧的项目。现在我需要我的消费者任务出队并处理项目,但这就是我的大脑混乱的地方。实现一个Dequeue方法的最佳方式是什么,以便消费者可以在某个时间点对队列进行快照,并出队所有项目进行处理(在此过程中,生产者可能仍在向队列添加项目)?