我知道使用 ConcurrentQueue 的 BlockingCollection 具有容量限制,上限为 100。
然而,我不确定这是什么意思。
我想实现一个并发缓存,并且可以 dequeue、可以在队列大小过大时进行 deque/enque 操作(即当缓存溢出时丢弃消息)。是否有办法使用 boundedcapacity 来实现这一点,或者更好的方法是手动实现还是创建一个新的集合。
基本上,我有一个读取线程和几个写入线程。如果队列中的数据是所有写入者中最新的话,那就最好了。
我知道使用 ConcurrentQueue 的 BlockingCollection 具有容量限制,上限为 100。
然而,我不确定这是什么意思。
我想实现一个并发缓存,并且可以 dequeue、可以在队列大小过大时进行 deque/enque 操作(即当缓存溢出时丢弃消息)。是否有办法使用 boundedcapacity 来实现这一点,或者更好的方法是手动实现还是创建一个新的集合。
基本上,我有一个读取线程和几个写入线程。如果队列中的数据是所有写入者中最新的话,那就最好了。
有界容量N意味着如果队列已经包含了N个项目,任何试图添加另一个项目的线程都会被阻塞,直到不同的线程删除一个项目。
你似乎想要的是一个不同的概念——你希望最近添加的项目成为消费线程出队的第一个项目。
您可以通过使用ConcurrentStack
而不是ConcurrentQueue作为底层存储来实现这一点。
您将使用此构造函数
并传入一个ConcurrentStack
。
例如:
var blockingCollection = new BlockingCollection<int>(new ConcurrentStack<int>());
ConcurrentStack
,可以确保消费线程出队的每个项都是该时刻队列中最新的项。BlockingCollection.TryAdd()
,如果在调用时集合已满,则返回false
。BlockingCollection.TryAdd()
,如果队列已满,它将返回false - 你能用这个吗? - Matthew WatsonBlockingCollection
不是实现这个目的的最佳方式。LinkedList<MyType> TheQueue = new LinkedList<MyType>();
object listLock = new object();
void Enqueue(MyType item)
{
lock (listLock)
{
TheQueue.AddFirst(item);
while (TheQueue.Count > MaxQueueSize)
{
// Queue overflow. Reduce to max size.
TheQueue.RemoveLast();
}
}
}
而出队则更加简单:
MyType Dequeue()
{
lock (listLock)
{
return (TheQueue.Count > 0) ? TheQueue.RemoveLast() : null;
}
}
BlockingCollection
,它可以容纳最新的 N 个元素,并在满时删除最旧的元素,那么您可以很容易地基于 Channel<T>
创建一个。虽然 Channels 旨在用于异步场景,但使它们阻塞消费者是微不足道的,并且即使在安装了 SynchronizationContext
的环境中使用,也不会引起任何不良副作用(如死锁)。请注意保留 HTML 标签。public class MostRecentBlockingCollection<T>
{
private readonly Channel<T> _channel;
public MostRecentBlockingCollection(int capacity)
{
_channel = Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.DropOldest,
});
}
public bool IsCompleted => _channel.Reader.Completion.IsCompleted;
public void Add(T item)
=> _channel.Writer.WriteAsync(item).AsTask().GetAwaiter().GetResult();
public T Take()
=> _channel.Reader.ReadAsync().AsTask().GetAwaiter().GetResult();
public void CompleteAdding() => _channel.Writer.Complete();
public IEnumerable<T> GetConsumingEnumerable()
{
while (_channel.Reader.WaitToReadAsync().AsTask().GetAwaiter().GetResult())
while (_channel.Reader.TryRead(out var item))
yield return item;
}
}
MostRecentBlockingCollection
类仅会阻塞消费者。生产者可以随时向集合中添加项目,可能导致一些之前添加的元素被丢弃。
添加取消支持应该很简单,因为 Channel<T>
API 已经支持它。添加超时支持则较不容易,但应该也不难实现。