固定大小的队列,在新的入队操作时自动出队旧的值。

167
我正在使用ConcurrentQueue作为一个共享数据结构,它的目的是保存最近N个传递给它的对象(类似于历史记录)。
假设我们有一个浏览器,我们想要保存最近100个浏览过的URL。我想要一个队列,在容量达到上限(历史记录中有100个地址)时,自动删除(出队)最旧的(第一个)条目,并在插入新条目(入队)时。
如何使用System.Collections来实现这个功能?

1
https://dev59.com/BHRB5IYBdhLWcg3wj32c - Aryabhatta
这并不是针对你特别制作的,而是为了任何遇到这个问题并可能发现它有用的人们。顺便说一句,它也涉及C#。你能在2分钟内读完所有答案,并弄清楚那里没有C#代码吗?无论如何,我自己也不确定,因此这只是一条评论... - Aryabhatta
你可以把方法包裹在锁里。鉴于它们很快,你可以直接锁定整个数组。不过这可能是重复的。使用带有C#代码的循环缓冲实现进行搜索可能会找到一些东西。无论如何,祝你好运。 - Aryabhatta
16个回答

132
我会编写一个包装类,在Enqueue时检查Count,当计数超过限制时执行Dequeue。
 public class FixedSizedQueue<T>
 {
     readonly ConcurrentQueue<T> q = new ConcurrentQueue<T>();
     private object lockObject = new object();

     public int Limit { get; set; }
     public void Enqueue(T obj)
     {
        q.Enqueue(obj);
        lock (lockObject)
        {
           T overflow;
           while (q.Count > Limit && q.TryDequeue(out overflow)) ;
        }
     }
 }

4
q 对象的私有性使得 lock 可以防止其他线程同时访问。 - Richard Schneider
24
锁定并不是个好主意。BCL 并发集合的整个目的是为了提供无锁并发以提高性能。你代码中的锁定会影响这一优势。事实上,我看不出你需要对 "deq" 进行锁定的原因。 - KFL
4
@KFL,需要进行锁定,因为“Count”和“TryDequeue”是两个独立的操作,它们没有被BCL Concurrent同步。 - Richard Schneider
14
如果你需要自己处理并发问题,那么将 ConcurrentQueue<T> 对象替换为更轻量的 Queue<T> 对象是一个好主意。 - 0b101010
6
不要定义自己的队列,只需使用继承的队列。如果你像现在这样做,实际上就不能对队列值进行任何其他操作,除了你的新Enqueue函数外,所有其他函数仍将调用原始队列。换句话说,尽管这个答案被标记为已接受,但它是完全错误的。 - Gábor
显示剩余8条评论

116

我会选择稍微改变一下…扩展ConcurrentQueue以便能够在FixedSizeQueue上使用Linq扩展

public class FixedSizedQueue<T> : ConcurrentQueue<T>
{
    private readonly object syncObject = new object();

    public int Size { get; private set; }

    public FixedSizedQueue(int size)
    {
        Size = size;
    }

    public new void Enqueue(T obj)
    {
        base.Enqueue(obj);
        lock (syncObject)
        {
            while (base.Count > Size)
            {
                T outObj;
                base.TryDequeue(out outObj);
            }
        }
    }
}

4
当某人将实例静态知道为ConcurrentQueue<T>时,他们刚刚绕过了你的'new'关键字。 - mhand
9
如果“某人”想要这样做,那么他们本来就会选择使用ConcurrentQueue<T>对象……这是一种自定义的存储类。没有人希望将其提交到.NET框架中。你只是为了创造问题而刻意寻求它。 - Dave Lawrence
19
我的观点是,与其使用子类化,也许你只需要将队列封装起来...这样可以在所有情况下强制执行所需的行为。此外,由于它是一种自定义的存储类,让我们完全自定义它,只公开我们需要的操作,我认为在这里使用子类化是错误的工具。 - mhand
3
我明白你的意思了,我可以包装一个队列并公开队列的枚举器,以便使用 Linq 扩展。 - Dave Lawrence
4
我同意@mhand的观点,你不应该继承ConcurrentQueue,因为Enqueue方法不是虚拟的。如果需要,你应该代理队列并实现整个接口。 - Chris Marisic
显示剩余6条评论

32

以下是一些基于Richard Schneider上面的答案的可用代码,希望对大家有所帮助:

public class FixedSizedQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizedQueue(int size)
    {
        Size = size;
    }

    public void Enqueue(T obj)
    {
        queue.Enqueue(obj);

        while (queue.Count > Size)
        {
            T outObj;
            queue.TryDequeue(out outObj);
        }
    }
}

3
投票反对,原因是在使用ConcurrentQueue时锁定很糟糕,此外还没有实现任何必需的接口,以使其成为真正的集合。 - Josh

16

就算只是一点点价值,这里有一个轻量级的循环缓冲区,其中一些方法被标记为安全和不安全使用。

public class CircularBuffer<T> : IEnumerable<T>
{
    readonly int size;
    readonly object locker;

    int count;
    int head;
    int rear;
    T[] values;

    public CircularBuffer(int max)
    {
        this.size = max;
        locker = new object();
        count = 0;
        head = 0;
        rear = 0;
        values = new T[size];
    }

    static int Incr(int index, int size)
    {
        return (index + 1) % size;
    }

    private void UnsafeEnsureQueueNotEmpty()
    {
        if (count == 0)
            throw new Exception("Empty queue");
    }

    public int Size { get { return size; } }
    public object SyncRoot { get { return locker; } }

    #region Count

    public int Count { get { return UnsafeCount; } }
    public int SafeCount { get { lock (locker) { return UnsafeCount; } } }
    public int UnsafeCount { get { return count; } }

    #endregion

    #region Enqueue

    public void Enqueue(T obj)
    {
        UnsafeEnqueue(obj);
    }

    public void SafeEnqueue(T obj)
    {
        lock (locker) { UnsafeEnqueue(obj); }
    }

    public void UnsafeEnqueue(T obj)
    {
        values[rear] = obj;

        if (Count == Size)
            head = Incr(head, Size);
        rear = Incr(rear, Size);
        count = Math.Min(count + 1, Size);
    }

    #endregion

    #region Dequeue

    public T Dequeue()
    {
        return UnsafeDequeue();
    }

    public T SafeDequeue()
    {
        lock (locker) { return UnsafeDequeue(); }
    }

    public T UnsafeDequeue()
    {
        UnsafeEnsureQueueNotEmpty();

        T res = values[head];
        values[head] = default(T);
        head = Incr(head, Size);
        count--;

        return res;
    }

    #endregion

    #region Peek

    public T Peek()
    {
        return UnsafePeek();
    }

    public T SafePeek()
    {
        lock (locker) { return UnsafePeek(); }
    }

    public T UnsafePeek()
    {
        UnsafeEnsureQueueNotEmpty();

        return values[head];
    }

    #endregion


    #region GetEnumerator

    public IEnumerator<T> GetEnumerator()
    {
        return UnsafeGetEnumerator();
    }

    public IEnumerator<T> SafeGetEnumerator()
    {
        lock (locker)
        {
            List<T> res = new List<T>(count);
            var enumerator = UnsafeGetEnumerator();
            while (enumerator.MoveNext())
                res.Add(enumerator.Current);
            return res.GetEnumerator();
        }
    }

    public IEnumerator<T> UnsafeGetEnumerator()
    {
        int index = head;
        for (int i = 0; i < count; i++)
        {
            yield return values[index];
            index = Incr(index, size);
        }
    }

    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }

    #endregion
}

我喜欢使用Foo()/SafeFoo()/UnsafeFoo()的约定:
  • Foo方法默认调用UnsafeFoo
  • UnsafeFoo方法在没有锁的情况下自由修改状态,它们只能调用其他不安全的方法。
  • SafeFoo方法在锁内部调用UnsafeFoo方法。
这可能有点啰嗦,但它可以使明显的错误更加明显,例如在一个应该是线程安全的方法中,在没有锁的情况下调用不安全的方法。请注意保留HTML标签。

11

我的版本只是普通队列的子类,没有什么特别之处,但是看到大家都参与了,而且还与主题标题相符,那我也可以把它放在这里。它还会返回已出队的元素。

public sealed class SizedQueue<T> : Queue<T>
{
    public int FixedCapacity { get; }
    public SizedQueue(int fixedCapacity)
    {
        this.FixedCapacity = fixedCapacity;
    }

    /// <summary>
    /// If the total number of item exceed the capacity, the oldest ones automatically dequeues.
    /// </summary>
    /// <returns>The dequeued value, if any.</returns>
    public new T Enqueue(T item)
    {
        base.Enqueue(item);
        if (base.Count > FixedCapacity)
        {
            return base.Dequeue();
        }
        return default;
    }
}

6
这是我对固定大小队列的看法。
它使用常规队列,以避免在使用ConcurrentQueue上的Count属性时出现同步开销。它还实现了IReadOnlyCollection,以便可以使用LINQ方法。其余部分与其他答案非常相似。
[Serializable]
[DebuggerDisplay("Count = {" + nameof(Count) + "}, Limit = {" + nameof(Limit) + "}")]
public class FixedSizedQueue<T> : IReadOnlyCollection<T>
{
    private readonly Queue<T> _queue = new Queue<T>();
    private readonly object _lock = new object();

    public int Count { get { lock (_lock) { return _queue.Count; } } }
    public int Limit { get; }

    public FixedSizedQueue(int limit)
    {
        if (limit < 1)
            throw new ArgumentOutOfRangeException(nameof(limit));

        Limit = limit;
    }

    public FixedSizedQueue(IEnumerable<T> collection)
    {
        if (collection is null || !collection.Any())
           throw new ArgumentException("Can not initialize the Queue with a null or empty collection", nameof(collection));

        _queue = new Queue<T>(collection);
        Limit = _queue.Count;
    }

    public void Enqueue(T obj)
    {
        lock (_lock)
        {
            _queue.Enqueue(obj);

            while (_queue.Count > Limit)
                _queue.Dequeue();
        }
    }

    public void Clear()
    {
        lock (_lock)
            _queue.Clear();
    }

    public IEnumerator<T> GetEnumerator()
    {
        lock (_lock)
            return new List<T>(_queue).GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

5

仅因为还没有人提到它.. 你可以使用LinkedList<T>并添加线程安全性:

public class Buffer<T> : LinkedList<T>
{
    private int capacity;

    public Buffer(int capacity)
    {
        this.capacity = capacity;   
    }

    public void Enqueue(T item)
    {
        // todo: add synchronization mechanism
        if (Count == capacity) RemoveLast();
        AddFirst(item);
    }

    public T Dequeue()
    {
        // todo: add synchronization mechanism
        var last = Last.Value;
        RemoveLast();
        return last;
    }
}

需要注意的是,在此示例中,默认的枚举顺序将会是后进先出(LIFO)。但如有必要,可以进行覆盖。


4

让我们再添加一个答案。为什么选择它而不是其他呢?

1) 简单易用。试图保证大小固然很好,但会带来不必要的复杂性,可能会引发其它问题。

2) 实现了IReadOnlyCollection接口,这意味着您可以在其上使用Linq,并将其传递给期望IEnumerable的各种方法。

3) 不使用锁。以上的许多解决方案使用锁,但在无锁集合中是不正确的。

4) 实现了与ConcurrentQueue相同的一组方法、属性和接口,包括IProducerConsumerCollection。如果您想要将该集合与BlockingCollection一起使用,那么这一点非常重要。

如果TryDequeue失败,这个实现可能会产生比预期更多的条目,但那种情况发生的频率似乎不值得编写专门的代码来解决,因为那样只会影响性能并引发意外的问题。

如果您确实想要保证集合大小,实现Prune()或类似的方法似乎是最好的想法。您可以在其它方法(包括TryDequeue)中使用ReaderWriterLockSlim读取锁,在修剪时才使用写入锁。

class ConcurrentFixedSizeQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>, ICollection {
    readonly ConcurrentQueue<T> m_concurrentQueue;
    readonly int m_maxSize;

    public int Count => m_concurrentQueue.Count;
    public bool IsEmpty => m_concurrentQueue.IsEmpty;

    public ConcurrentFixedSizeQueue (int maxSize) : this(Array.Empty<T>(), maxSize) { }

    public ConcurrentFixedSizeQueue (IEnumerable<T> initialCollection, int maxSize) {
        if (initialCollection == null) {
            throw new ArgumentNullException(nameof(initialCollection));
        }

        m_concurrentQueue = new ConcurrentQueue<T>(initialCollection);
        m_maxSize = maxSize;
    }

    public void Enqueue (T item) {
        m_concurrentQueue.Enqueue(item);

        if (m_concurrentQueue.Count > m_maxSize) {
            T result;
            m_concurrentQueue.TryDequeue(out result);
        }
    }

    public void TryPeek (out T result) => m_concurrentQueue.TryPeek(out result);
    public bool TryDequeue (out T result) => m_concurrentQueue.TryDequeue(out result);

    public void CopyTo (T[] array, int index) => m_concurrentQueue.CopyTo(array, index);
    public T[] ToArray () => m_concurrentQueue.ToArray();

    public IEnumerator<T> GetEnumerator () => m_concurrentQueue.GetEnumerator();
    IEnumerator IEnumerable.GetEnumerator () => GetEnumerator();

    // Explicit ICollection implementations.
    void ICollection.CopyTo (Array array, int index) => ((ICollection)m_concurrentQueue).CopyTo(array, index);
    object ICollection.SyncRoot => ((ICollection) m_concurrentQueue).SyncRoot;
    bool ICollection.IsSynchronized => ((ICollection) m_concurrentQueue).IsSynchronized;

    // Explicit IProducerConsumerCollection<T> implementations.
    bool IProducerConsumerCollection<T>.TryAdd (T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryAdd(item);
    bool IProducerConsumerCollection<T>.TryTake (out T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryTake(out item);

    public override int GetHashCode () => m_concurrentQueue.GetHashCode();
    public override bool Equals (object obj) => m_concurrentQueue.Equals(obj);
    public override string ToString () => m_concurrentQueue.ToString();
}

4

这要看具体使用场景,我注意到有些解决方案在多线程环境下可能会超出尺寸。无论如何,我的应用场景是显示最近的5个事件,有多个线程将事件写入队列,另一个线程从中读取并在Winform控件中显示。所以这是我的解决方案。

编辑:由于我们已经在实现中使用了锁定,所以实际上不需要ConcurrentQueue,它可能会提高性能。

class FixedSizedConcurrentQueue<T> 
{
    readonly Queue<T> queue = new Queue<T>();
    readonly object syncObject = new object();

    public int MaxSize { get; private set; }

    public FixedSizedConcurrentQueue(int maxSize)
    {
        MaxSize = maxSize;
    }

    public void Enqueue(T obj)
    {
        lock (syncObject)
        {
            queue.Enqueue(obj);
            while (queue.Count > MaxSize)
            {
                queue.Dequeue();
            }
        }
    }

    public T[] ToArray()
    {
        T[] result = null;
        lock (syncObject)
        {
            result = queue.ToArray();
        }

        return result;
    }

    public void Clear()
    {
        lock (syncObject)
        {
            queue.Clear();
        }
    }
}

编辑:在上面的示例中,我们实际上不需要syncObject,我们可以使用queue对象,因为我们没有在任何函数中重新初始化queue,而且它已被标记为readonly


3

仅供娱乐,以下是我认为可以解决大部分评论者所担忧的另一种实现方式。特别地,线程安全是通过无需锁定来实现的,而且该实现被包装类隐藏起来。

public class FixedSizeQueue<T> : IReadOnlyCollection<T>
{
  private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
  private int _count;

  public int Limit { get; private set; }

  public FixedSizeQueue(int limit)
  {
    this.Limit = limit;
  }

  public void Enqueue(T obj)
  {
    _queue.Enqueue(obj);
    Interlocked.Increment(ref _count);

    // Calculate the number of items to be removed by this thread in a thread safe manner
    int currentCount;
    int finalCount;
    do
    {
      currentCount = _count;
      finalCount = Math.Min(currentCount, this.Limit);
    } while (currentCount != 
      Interlocked.CompareExchange(ref _count, finalCount, currentCount));

    T overflow;
    while (currentCount > finalCount && _queue.TryDequeue(out overflow))
      currentCount--;
  }

  public int Count
  {
    get { return _count; }
  }

  public IEnumerator<T> GetEnumerator()
  {
    return _queue.GetEnumerator();
  }

  System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  {
    return _queue.GetEnumerator();
  }
}

2
如果同时使用,这段代码会出问题——假设一个线程在调用 _queue.Enqueue(obj) 之后但在 Interlocked.Increment(ref _count) 之前被抢占,而另一个线程调用了 .Count,那么它将得到错误的计数。我还没有检查其他问题。 - KFL

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接