假设我们有一个浏览器,我们想要保存最近100个浏览过的URL。我想要一个队列,在容量达到上限(历史记录中有100个地址)时,自动删除(出队)最旧的(第一个)条目,并在插入新条目(入队)时。
如何使用System.Collections来实现这个功能?
public class FixedSizedQueue<T>
{
readonly ConcurrentQueue<T> q = new ConcurrentQueue<T>();
private object lockObject = new object();
public int Limit { get; set; }
public void Enqueue(T obj)
{
q.Enqueue(obj);
lock (lockObject)
{
T overflow;
while (q.Count > Limit && q.TryDequeue(out overflow)) ;
}
}
}
q
对象的私有性使得 lock
可以防止其他线程同时访问。 - Richard SchneiderConcurrentQueue<T>
对象替换为更轻量的 Queue<T>
对象是一个好主意。 - 0b101010Enqueue
函数外,所有其他函数仍将调用原始队列。换句话说,尽管这个答案被标记为已接受,但它是完全错误的。 - Gábor我会选择稍微改变一下…扩展ConcurrentQueue以便能够在FixedSizeQueue上使用Linq扩展
public class FixedSizedQueue<T> : ConcurrentQueue<T>
{
private readonly object syncObject = new object();
public int Size { get; private set; }
public FixedSizedQueue(int size)
{
Size = size;
}
public new void Enqueue(T obj)
{
base.Enqueue(obj);
lock (syncObject)
{
while (base.Count > Size)
{
T outObj;
base.TryDequeue(out outObj);
}
}
}
}
以下是一些基于Richard Schneider上面的答案的可用代码,希望对大家有所帮助:
public class FixedSizedQueue<T>
{
readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
public int Size { get; private set; }
public FixedSizedQueue(int size)
{
Size = size;
}
public void Enqueue(T obj)
{
queue.Enqueue(obj);
while (queue.Count > Size)
{
T outObj;
queue.TryDequeue(out outObj);
}
}
}
就算只是一点点价值,这里有一个轻量级的循环缓冲区,其中一些方法被标记为安全和不安全使用。
public class CircularBuffer<T> : IEnumerable<T>
{
readonly int size;
readonly object locker;
int count;
int head;
int rear;
T[] values;
public CircularBuffer(int max)
{
this.size = max;
locker = new object();
count = 0;
head = 0;
rear = 0;
values = new T[size];
}
static int Incr(int index, int size)
{
return (index + 1) % size;
}
private void UnsafeEnsureQueueNotEmpty()
{
if (count == 0)
throw new Exception("Empty queue");
}
public int Size { get { return size; } }
public object SyncRoot { get { return locker; } }
#region Count
public int Count { get { return UnsafeCount; } }
public int SafeCount { get { lock (locker) { return UnsafeCount; } } }
public int UnsafeCount { get { return count; } }
#endregion
#region Enqueue
public void Enqueue(T obj)
{
UnsafeEnqueue(obj);
}
public void SafeEnqueue(T obj)
{
lock (locker) { UnsafeEnqueue(obj); }
}
public void UnsafeEnqueue(T obj)
{
values[rear] = obj;
if (Count == Size)
head = Incr(head, Size);
rear = Incr(rear, Size);
count = Math.Min(count + 1, Size);
}
#endregion
#region Dequeue
public T Dequeue()
{
return UnsafeDequeue();
}
public T SafeDequeue()
{
lock (locker) { return UnsafeDequeue(); }
}
public T UnsafeDequeue()
{
UnsafeEnsureQueueNotEmpty();
T res = values[head];
values[head] = default(T);
head = Incr(head, Size);
count--;
return res;
}
#endregion
#region Peek
public T Peek()
{
return UnsafePeek();
}
public T SafePeek()
{
lock (locker) { return UnsafePeek(); }
}
public T UnsafePeek()
{
UnsafeEnsureQueueNotEmpty();
return values[head];
}
#endregion
#region GetEnumerator
public IEnumerator<T> GetEnumerator()
{
return UnsafeGetEnumerator();
}
public IEnumerator<T> SafeGetEnumerator()
{
lock (locker)
{
List<T> res = new List<T>(count);
var enumerator = UnsafeGetEnumerator();
while (enumerator.MoveNext())
res.Add(enumerator.Current);
return res.GetEnumerator();
}
}
public IEnumerator<T> UnsafeGetEnumerator()
{
int index = head;
for (int i = 0; i < count; i++)
{
yield return values[index];
index = Incr(index, size);
}
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
#endregion
}
Foo()/SafeFoo()/UnsafeFoo()
的约定:
Foo
方法默认调用UnsafeFoo
。UnsafeFoo
方法在没有锁的情况下自由修改状态,它们只能调用其他不安全的方法。SafeFoo
方法在锁内部调用UnsafeFoo
方法。我的版本只是普通队列
的子类,没有什么特别之处,但是看到大家都参与了,而且还与主题标题相符,那我也可以把它放在这里。它还会返回已出队的元素。
public sealed class SizedQueue<T> : Queue<T>
{
public int FixedCapacity { get; }
public SizedQueue(int fixedCapacity)
{
this.FixedCapacity = fixedCapacity;
}
/// <summary>
/// If the total number of item exceed the capacity, the oldest ones automatically dequeues.
/// </summary>
/// <returns>The dequeued value, if any.</returns>
public new T Enqueue(T item)
{
base.Enqueue(item);
if (base.Count > FixedCapacity)
{
return base.Dequeue();
}
return default;
}
}
[Serializable]
[DebuggerDisplay("Count = {" + nameof(Count) + "}, Limit = {" + nameof(Limit) + "}")]
public class FixedSizedQueue<T> : IReadOnlyCollection<T>
{
private readonly Queue<T> _queue = new Queue<T>();
private readonly object _lock = new object();
public int Count { get { lock (_lock) { return _queue.Count; } } }
public int Limit { get; }
public FixedSizedQueue(int limit)
{
if (limit < 1)
throw new ArgumentOutOfRangeException(nameof(limit));
Limit = limit;
}
public FixedSizedQueue(IEnumerable<T> collection)
{
if (collection is null || !collection.Any())
throw new ArgumentException("Can not initialize the Queue with a null or empty collection", nameof(collection));
_queue = new Queue<T>(collection);
Limit = _queue.Count;
}
public void Enqueue(T obj)
{
lock (_lock)
{
_queue.Enqueue(obj);
while (_queue.Count > Limit)
_queue.Dequeue();
}
}
public void Clear()
{
lock (_lock)
_queue.Clear();
}
public IEnumerator<T> GetEnumerator()
{
lock (_lock)
return new List<T>(_queue).GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
}
仅因为还没有人提到它.. 你可以使用LinkedList<T>
并添加线程安全性:
public class Buffer<T> : LinkedList<T>
{
private int capacity;
public Buffer(int capacity)
{
this.capacity = capacity;
}
public void Enqueue(T item)
{
// todo: add synchronization mechanism
if (Count == capacity) RemoveLast();
AddFirst(item);
}
public T Dequeue()
{
// todo: add synchronization mechanism
var last = Last.Value;
RemoveLast();
return last;
}
}
需要注意的是,在此示例中,默认的枚举顺序将会是后进先出(LIFO)。但如有必要,可以进行覆盖。
让我们再添加一个答案。为什么选择它而不是其他呢?
1) 简单易用。试图保证大小固然很好,但会带来不必要的复杂性,可能会引发其它问题。
2) 实现了IReadOnlyCollection接口,这意味着您可以在其上使用Linq,并将其传递给期望IEnumerable的各种方法。
3) 不使用锁。以上的许多解决方案使用锁,但在无锁集合中是不正确的。
4) 实现了与ConcurrentQueue相同的一组方法、属性和接口,包括IProducerConsumerCollection。如果您想要将该集合与BlockingCollection一起使用,那么这一点非常重要。
如果TryDequeue失败,这个实现可能会产生比预期更多的条目,但那种情况发生的频率似乎不值得编写专门的代码来解决,因为那样只会影响性能并引发意外的问题。
如果您确实想要保证集合大小,实现Prune()或类似的方法似乎是最好的想法。您可以在其它方法(包括TryDequeue)中使用ReaderWriterLockSlim读取锁,在修剪时才使用写入锁。
class ConcurrentFixedSizeQueue<T> : IProducerConsumerCollection<T>, IReadOnlyCollection<T>, ICollection {
readonly ConcurrentQueue<T> m_concurrentQueue;
readonly int m_maxSize;
public int Count => m_concurrentQueue.Count;
public bool IsEmpty => m_concurrentQueue.IsEmpty;
public ConcurrentFixedSizeQueue (int maxSize) : this(Array.Empty<T>(), maxSize) { }
public ConcurrentFixedSizeQueue (IEnumerable<T> initialCollection, int maxSize) {
if (initialCollection == null) {
throw new ArgumentNullException(nameof(initialCollection));
}
m_concurrentQueue = new ConcurrentQueue<T>(initialCollection);
m_maxSize = maxSize;
}
public void Enqueue (T item) {
m_concurrentQueue.Enqueue(item);
if (m_concurrentQueue.Count > m_maxSize) {
T result;
m_concurrentQueue.TryDequeue(out result);
}
}
public void TryPeek (out T result) => m_concurrentQueue.TryPeek(out result);
public bool TryDequeue (out T result) => m_concurrentQueue.TryDequeue(out result);
public void CopyTo (T[] array, int index) => m_concurrentQueue.CopyTo(array, index);
public T[] ToArray () => m_concurrentQueue.ToArray();
public IEnumerator<T> GetEnumerator () => m_concurrentQueue.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator () => GetEnumerator();
// Explicit ICollection implementations.
void ICollection.CopyTo (Array array, int index) => ((ICollection)m_concurrentQueue).CopyTo(array, index);
object ICollection.SyncRoot => ((ICollection) m_concurrentQueue).SyncRoot;
bool ICollection.IsSynchronized => ((ICollection) m_concurrentQueue).IsSynchronized;
// Explicit IProducerConsumerCollection<T> implementations.
bool IProducerConsumerCollection<T>.TryAdd (T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryAdd(item);
bool IProducerConsumerCollection<T>.TryTake (out T item) => ((IProducerConsumerCollection<T>) m_concurrentQueue).TryTake(out item);
public override int GetHashCode () => m_concurrentQueue.GetHashCode();
public override bool Equals (object obj) => m_concurrentQueue.Equals(obj);
public override string ToString () => m_concurrentQueue.ToString();
}
这要看具体使用场景,我注意到有些解决方案在多线程环境下可能会超出尺寸。无论如何,我的应用场景是显示最近的5个事件,有多个线程将事件写入队列,另一个线程从中读取并在Winform控件中显示。所以这是我的解决方案。
编辑:由于我们已经在实现中使用了锁定,所以实际上不需要ConcurrentQueue,它可能会提高性能。
class FixedSizedConcurrentQueue<T>
{
readonly Queue<T> queue = new Queue<T>();
readonly object syncObject = new object();
public int MaxSize { get; private set; }
public FixedSizedConcurrentQueue(int maxSize)
{
MaxSize = maxSize;
}
public void Enqueue(T obj)
{
lock (syncObject)
{
queue.Enqueue(obj);
while (queue.Count > MaxSize)
{
queue.Dequeue();
}
}
}
public T[] ToArray()
{
T[] result = null;
lock (syncObject)
{
result = queue.ToArray();
}
return result;
}
public void Clear()
{
lock (syncObject)
{
queue.Clear();
}
}
}
编辑:在上面的示例中,我们实际上不需要syncObject
,我们可以使用queue
对象,因为我们没有在任何函数中重新初始化queue
,而且它已被标记为readonly
。
仅供娱乐,以下是我认为可以解决大部分评论者所担忧的另一种实现方式。特别地,线程安全是通过无需锁定来实现的,而且该实现被包装类隐藏起来。
public class FixedSizeQueue<T> : IReadOnlyCollection<T>
{
private ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
private int _count;
public int Limit { get; private set; }
public FixedSizeQueue(int limit)
{
this.Limit = limit;
}
public void Enqueue(T obj)
{
_queue.Enqueue(obj);
Interlocked.Increment(ref _count);
// Calculate the number of items to be removed by this thread in a thread safe manner
int currentCount;
int finalCount;
do
{
currentCount = _count;
finalCount = Math.Min(currentCount, this.Limit);
} while (currentCount !=
Interlocked.CompareExchange(ref _count, finalCount, currentCount));
T overflow;
while (currentCount > finalCount && _queue.TryDequeue(out overflow))
currentCount--;
}
public int Count
{
get { return _count; }
}
public IEnumerator<T> GetEnumerator()
{
return _queue.GetEnumerator();
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return _queue.GetEnumerator();
}
}
_queue.Enqueue(obj)
之后但在 Interlocked.Increment(ref _count)
之前被抢占,而另一个线程调用了 .Count
,那么它将得到错误的计数。我还没有检查其他问题。 - KFL