BlockingCollection最大大小

6

我知道使用 ConcurrentQueue 的 BlockingCollection 具有容量限制,上限为 100。

然而,我不确定这是什么意思。

我想实现一个并发缓存,并且可以 dequeue、可以在队列大小过大时进行 deque/enque 操作(即当缓存溢出时丢弃消息)。是否有办法使用 boundedcapacity 来实现这一点,或者更好的方法是手动实现还是创建一个新的集合。

基本上,我有一个读取线程和几个写入线程。如果队列中的数据是所有写入者中最新的话,那就最好了。


也许你需要一种后台事件来通知作者最新的数据? - NoWar
3个回答

3

有界容量N意味着如果队列已经包含了N个项目,任何试图添加另一个项目的线程都会被阻塞,直到不同的线程删除一个项目。

你似乎想要的是一个不同的概念——你希望最近添加的项目成为消费线程出队的第一个项目。

您可以通过使用ConcurrentStack而不是ConcurrentQueue作为底层存储来实现这一点。

您将使用此构造函数并传入一个ConcurrentStack

例如:

var blockingCollection = new BlockingCollection<int>(new ConcurrentStack<int>());

使用ConcurrentStack,可以确保消费线程出队的每个项都是该时刻队列中最新的项。
还要注意,如果为阻塞集合指定了一个上限,可以使用BlockingCollection.TryAdd(),如果在调用时集合已满,则返回false

基于我的最新陈述是正确的。然而,我仍然希望队列按照它们被排队的顺序进行调度。似乎最好在每次添加之前执行以下操作: if(q.count>x) {q.take();//donothing} q.add(newitem) 以保持大小不变,但是如果另一个线程在计算长度后进行了取走,则会不必要地删除消息。 - maxfridbe
嗯,我不认为我真正理解你的要求...但是如果你为阻塞集合指定一个上限,你可以使用BlockingCollection.TryAdd(),如果队列已满,它将返回false - 你能用这个吗? - Matthew Watson

3
我觉得你想要构建类似于MRU(最近使用)缓存的东西。BlockingCollection不是实现这个目的的最佳方式。
我建议你使用LinkedList。它不是线程安全的,所以你需要提供自己的同步机制,但这并不太难。你的enqueue方法应该像这样:
LinkedList<MyType> TheQueue = new LinkedList<MyType>();
object listLock = new object();

void Enqueue(MyType item)
{
    lock (listLock)
    {
        TheQueue.AddFirst(item);
        while (TheQueue.Count > MaxQueueSize)
        {
            // Queue overflow. Reduce to max size.
            TheQueue.RemoveLast();
        }
    }
}

而出队则更加简单:

MyType Dequeue()
{
    lock (listLock)
    {
        return (TheQueue.Count > 0) ? TheQueue.RemoveLast() : null;
    }
}

如果您想让消费者在队列上进行非繁忙等待,那么涉及的内容就有点多了。您可以使用Monitor.Wait和Monitor.Pulse来实现。请参见Monitor.Pulse页面上的示例。
更新:
我想到您也可以使用循环缓冲区(数组)来实现同样的功能。只需维护头指针和尾指针即可。您可以在head处插入,在tail处删除。如果要插入数据时head == tail,则需要增加tail,这实际上相当于删除前一个tail项。

2
如果您想要一个自定义的 BlockingCollection,它可以容纳最新的 N 个元素,并在满时删除最旧的元素,那么您可以很容易地基于 Channel<T> 创建一个。虽然 Channels 旨在用于异步场景,但使它们阻塞消费者是微不足道的,并且即使在安装了 SynchronizationContext 的环境中使用,也不会引起任何不良副作用(如死锁)。请注意保留 HTML 标签。
public class MostRecentBlockingCollection<T>
{
    private readonly Channel<T> _channel;

    public MostRecentBlockingCollection(int capacity)
    {
        _channel = Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.DropOldest,
        });
    }

    public bool IsCompleted => _channel.Reader.Completion.IsCompleted;

    public void Add(T item)
        => _channel.Writer.WriteAsync(item).AsTask().GetAwaiter().GetResult();

    public T Take()
        => _channel.Reader.ReadAsync().AsTask().GetAwaiter().GetResult();

    public void CompleteAdding() => _channel.Writer.Complete();

    public IEnumerable<T> GetConsumingEnumerable()
    {
        while (_channel.Reader.WaitToReadAsync().AsTask().GetAwaiter().GetResult())
            while (_channel.Reader.TryRead(out var item))
                yield return item;
    }
}

MostRecentBlockingCollection 类仅会阻塞消费者。生产者可以随时向集合中添加项目,可能导致一些之前添加的元素被丢弃。

添加取消支持应该很简单,因为 Channel<T> API 已经支持它。添加超时支持则较不容易,但应该也不难实现。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接