看到 .Net 4.0 中新增的 System.Collections.Concurrent
命名空间,我感到非常兴奋,这很棒!我见过ConcurrentDictionary
、ConcurrentQueue
、ConcurrentStack
、ConcurrentBag
和 BlockingCollection
。
有一件事情似乎莫名其妙地缺失了,那就是一个 ConcurrentList<T>
。难道我必须自己编写它(或从网络上获取 :))?
这里有什么明显的遗漏吗?
看到 .Net 4.0 中新增的 System.Collections.Concurrent
命名空间,我感到非常兴奋,这很棒!我见过ConcurrentDictionary
、ConcurrentQueue
、ConcurrentStack
、ConcurrentBag
和 BlockingCollection
。
有一件事情似乎莫名其妙地缺失了,那就是一个 ConcurrentList<T>
。难道我必须自己编写它(或从网络上获取 :))?
这里有什么明显的遗漏吗?
我曾经尝试过实现线程安全的、无锁的可调整大小的数组,详见此处(另外还有在GitHub上)。但是我的实现存在一些问题,在此不再赘述。最重要的是,我学到了什么。
首先,你不可能得到一个完全的、无锁的、线程安全的IList<T>
实现。特别是,随机插入和删除是行不通的,除非你也放弃O(1)的随机访问(也就是说,除非你“作弊”,只使用某种形式的链表,并让索引变得低效)。
我认为值得尝试的是一个线程安全的、有限的IList<T>
子集:特别是,它允许一个Add
操作,并提供根据索引的随机只读访问(但没有Insert
、RemoveAt
等,也没有随机写访问)。
这是我的ConcurrentList<T>
实现的目标。但是当我在多线程场景下测试它的性能时,发现List<T>
的简单同步添加更快。基本上,向一个List<T>
添加元素已经非常快了;其中涉及的计算步骤的复杂度微不足道(增加索引并将其赋值给数组中的一个元素,就这样)。你需要大量并发写操作才能看到任何锁争用,即使如此,每个写操作的平均性能仍然能够击败更昂贵、但无锁的ConcurrentList<T>
的实现。
ConcurrentList<T>
集合类型在性能方面保证低开销的场景,每次添加元素都有保障(与平均性能目标相反)。ConcurrentList
最大的优势场景是在列表中添加的活动不是很多,但有许多并发读取器的情况下。可以将读取器的开销减少到单个内存屏障(如果读取器不关心稍微过时的数据,甚至可以消除这个屏障)。 - supercatConcurrentList<T>
,使读者无需任何锁定即可保证看到一致的状态,相对较小的开销就可以实现。当列表从32扩展到64时,保留大小为32的数组并创建一个新的大小为64的数组。在添加下一个32个项目时,将其放入新数组的32-63槽中,并将旧数组中的一个项目复制到新数组中。在添加第64个项目之前,读者将查找0-31项的大小为32的数组和32-63项的大小为64的数组。 - supercat你会用ConcurrentList做什么?
在多线程环境下,随机访问容器的概念并不像它看起来的那样有用。这个说法
if (i < MyConcurrentList.Count)
x = MyConcurrentList[i];
作为一个整体仍然不能保证线程安全。
不要创建ConcurrentList,尝试使用已有的解决方案。最常用的类是 ConcurrentBag 和尤其是 BlockingCollection。
尊重提供的精彩答案,有时我只想要一个线程安全的IList。没有高级的功能或花哨的东西。性能在许多情况下很重要,但有时这并不是需要考虑的问题。是的,“TryGetValue”等方法总会存在挑战,但大多数情况下,我只希望得到一个可以枚举而无需担心将锁放在所有内容周围的东西。是的,有人可能会在我的实现中找到一些“错误”,导致死锁或其他问题(我想),但说实话:当涉及到多线程时,如果您没有正确编写代码,它就会去死锁了。基于此,我决定制作一个简单的ConcurrentList实现,以满足这些基本需求。
值得一提的是:我进行了一个简单的测试,向普通List和ConcurrentList添加了10,000,000个项,结果如下:
List 完成时间: 7793 毫秒。 Concurrent 完成时间: 8064 毫秒。
public class ConcurrentList<T> : IList<T>, IDisposable
{
#region Fields
private readonly List<T> _list;
private readonly ReaderWriterLockSlim _lock;
#endregion
#region Constructors
public ConcurrentList()
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>();
}
public ConcurrentList(int capacity)
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>(capacity);
}
public ConcurrentList(IEnumerable<T> items)
{
this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this._list = new List<T>(items);
}
#endregion
#region Methods
public void Add(T item)
{
try
{
this._lock.EnterWriteLock();
this._list.Add(item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public void Insert(int index, T item)
{
try
{
this._lock.EnterWriteLock();
this._list.Insert(index, item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public bool Remove(T item)
{
try
{
this._lock.EnterWriteLock();
return this._list.Remove(item);
}
finally
{
this._lock.ExitWriteLock();
}
}
public void RemoveAt(int index)
{
try
{
this._lock.EnterWriteLock();
this._list.RemoveAt(index);
}
finally
{
this._lock.ExitWriteLock();
}
}
public int IndexOf(T item)
{
try
{
this._lock.EnterReadLock();
return this._list.IndexOf(item);
}
finally
{
this._lock.ExitReadLock();
}
}
public void Clear()
{
try
{
this._lock.EnterWriteLock();
this._list.Clear();
}
finally
{
this._lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
try
{
this._lock.EnterReadLock();
return this._list.Contains(item);
}
finally
{
this._lock.ExitReadLock();
}
}
public void CopyTo(T[] array, int arrayIndex)
{
try
{
this._lock.EnterReadLock();
this._list.CopyTo(array, arrayIndex);
}
finally
{
this._lock.ExitReadLock();
}
}
public IEnumerator<T> GetEnumerator()
{
return new ConcurrentEnumerator<T>(this._list, this._lock);
}
IEnumerator IEnumerable.GetEnumerator()
{
return new ConcurrentEnumerator<T>(this._list, this._lock);
}
~ConcurrentList()
{
this.Dispose(false);
}
public void Dispose()
{
this.Dispose(true);
}
private void Dispose(bool disposing)
{
if (disposing)
GC.SuppressFinalize(this);
this._lock.Dispose();
}
#endregion
#region Properties
public T this[int index]
{
get
{
try
{
this._lock.EnterReadLock();
return this._list[index];
}
finally
{
this._lock.ExitReadLock();
}
}
set
{
try
{
this._lock.EnterWriteLock();
this._list[index] = value;
}
finally
{
this._lock.ExitWriteLock();
}
}
}
public int Count
{
get
{
try
{
this._lock.EnterReadLock();
return this._list.Count;
}
finally
{
this._lock.ExitReadLock();
}
}
}
public bool IsReadOnly
{
get { return false; }
}
#endregion
}
public class ConcurrentEnumerator<T> : IEnumerator<T>
{
#region Fields
private readonly IEnumerator<T> _inner;
private readonly ReaderWriterLockSlim _lock;
#endregion
#region Constructor
public ConcurrentEnumerator(IEnumerable<T> inner, ReaderWriterLockSlim @lock)
{
this._lock = @lock;
this._lock.EnterReadLock();
this._inner = inner.GetEnumerator();
}
#endregion
#region Methods
public bool MoveNext()
{
return _inner.MoveNext();
}
public void Reset()
{
_inner.Reset();
}
public void Dispose()
{
this._lock.ExitReadLock();
}
#endregion
#region Properties
public T Current
{
get { return _inner.Current; }
}
object IEnumerator.Current
{
get { return _inner.Current; }
}
#endregion
}
RemoveAt(int index)
方法在多线程环境下永远不安全。Insert(int index, T item)
方法仅在 index==0
时才是安全的。IndexOf()
方法返回的结果立即过时。更不要说使用 this[int]
了。 - H HReaderWriterLockSlim
可以通过同时使用EnterUpgradeableReadLock()
轻松地造成死锁。但是你没有使用它,也没有将锁暴露给外部,并且你没有在持有读锁时调用进入写锁的方法,因此使用你的类不会使死锁更有可能发生。 - Evgeniy Berezovskyvar l = new ConcurrentList<string>(); /* ... */ l[0] += "asdf";
。一般来说,任何读写组合在并发执行时都可能导致严重问题。这就是为什么并发数据结构通常提供了相应的方法,例如 ConcurrentDictionary
的 AddOrGet
等。注意:您不断重复使用 this.
是多余的,因为下划线已经标记了成员。 - Evgeniy Berezovsky没有ConcurrentList的原因是它本质上无法编写。原因在于IList中的几个重要操作依赖于索引,而这根本行不通。例如:
int catIndex = list.IndexOf("cat");
list.Insert(catIndex, "dog");
作者试图实现的效果是在“猫”之前插入“狗”,但在多线程环境下,在这两行代码之间,列表可能会发生任何事情。例如,另一个线程可能会执行list.RemoveAt(0)
,将整个列表向左移动,但关键是catIndex不会改变。这里的影响是Insert
操作实际上会将“狗”放在猫之后,而不是之前。
您看到作为对此问题的“答案”提供的几种实现都是出于善意,但正如上面所示,它们并不能提供可靠的结果。如果您确实想在多线程环境中使用类似于列表的语义,那么您无法通过在列表实现方法内部放置锁来实现。您必须确保您使用的任何索引完全存在于锁的上下文中。总之,您可以使用带有正确锁定的List在多线程环境中,但是列表本身不能存在于该世界中。
如果您认为需要并发列表,则实际上只有两种可能性:
如果您有一个ConcurrentBag并且需要将其作为IList传递,则存在问题,因为您调用的方法已经指定它们可能会尝试执行上述猫和狗的操作。在大多数情况下,这意味着您正在调用的方法根本不适用于多线程环境。这意味着您要么对其进行重构以使其适用,要么如果无法这样做,则必须非常小心地处理它。您几乎肯定需要创建自己的带有自己锁的集合,并在锁内部调用有问题的方法。
ConcurrentList
(作为可调整大小的数组,而不是链表)不容易使用非阻塞操作编写。它的 API 不太适合“并发”版本。
ConcurrentList
在使用非阻塞操作进行编写并不简单。其 API 也不适合设计成“并发”版本。请注意保留 HTML 标记。var snap = _list; snap[snap.Count - 1];
永远不会(嗯,当然空列表除外)引发异常,您还可以获得带有快照语义的线程安全枚举,这是免费的..我有多么喜欢不可变性!static class CopyOnWriteSwapper
{
public static void Swap<T>(ref T obj, Func<T, T> cloner, Action<T> op)
where T : class
{
while (true)
{
var objBefore = Volatile.Read(ref obj);
var newObj = cloner(objBefore);
op(newObj);
if (Interlocked.CompareExchange(ref obj, newObj, objBefore) == objBefore)
return;
}
}
}
用法
CopyOnWriteSwapper.Swap(ref _myList,
orig => new List<string>(orig),
clone => clone.Add("asdf"));
cloner
和op
。List<T>
时,您可以使用List.AsReadOnly()来防范修改。注意事项 #3 如果您的数据结构很大并且经常进行修改,使用全量复制的方法可能会在内存消耗和复制所涉及的CPU成本方面具有限制性。在这种情况下,您可能希望改用微软的Immutable Collections。
System.Collections.Generic.List<t>
对于多个读取器已经是线程安全的。尝试使其对于多个写入器线程安全是没有意义的(正如 Henk 和 Stephen 已经提到的原因)。
有些人提出了一些好的观点(以及我的一些想法):
这不是一个答案。这只是一些评论,不太适合特定的位置。
......我的结论是,微软必须对“foreach”进行一些深刻的改变,以使多线程集合更容易使用。此外,它必须遵循自己的IEnumerator使用规则。在那之前,我们可以轻松编写一个MultiThreadList,它将使用阻塞迭代器,但不会遵循“IList”。相反,您将不得不定义自己的“IListPersonnal”接口,在“插入”,“删除”和随机访问器(索引器)失败时可能会出现异常。但如果它不是标准,谁会想使用它呢?
ConcurrentOrderedBag<T>
,其中包括一个只读的 IList<T>
实现,但也提供了一个完全线程安全的 int Add(T value)
方法。 我不明白为什么需要任何 ForEach
更改。尽管微软没有明确说明,但他们的做法表明,如果枚举器无法保证无故障操作,则只有在枚举器枚举创建时存在的集合内容时才需要集合修改异常。 - supercatIEnumerable<T>
包含一个返回类型为 IEnumerable<T>
的“快照”方法。不可变集合可以返回自身;有界集合可以将自己复制到 List<T>
或 T[]
中,并在其上调用 GetEnumerator
。一些无界集合可以实现 Snapshot
,而那些不能的集合则可以在不尝试填充其内容列表的情况下抛出异常。 - supercat我实现了一个类似于Brian's的东西。但我的不同之处在于:
yield return
来生成枚举器。DoSync
和GetSync
方法允许顺序交互,需要对列表进行独占访问。代码:
public class ConcurrentList<T> : IList<T>, IDisposable
{
private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
private int _count = 0;
public int Count
{
get
{
_lock.EnterReadLock();
try
{
return _count;
}
finally
{
_lock.ExitReadLock();
}
}
}
public int InternalArrayLength
{
get
{
_lock.EnterReadLock();
try
{
return _arr.Length;
}
finally
{
_lock.ExitReadLock();
}
}
}
private T[] _arr;
public ConcurrentList(int initialCapacity)
{
_arr = new T[initialCapacity];
}
public ConcurrentList():this(4)
{ }
public ConcurrentList(IEnumerable<T> items)
{
_arr = items.ToArray();
_count = _arr.Length;
}
public void Add(T item)
{
_lock.EnterWriteLock();
try
{
var newCount = _count + 1;
EnsureCapacity(newCount);
_arr[_count] = item;
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
public void AddRange(IEnumerable<T> items)
{
if (items == null)
throw new ArgumentNullException("items");
_lock.EnterWriteLock();
try
{
var arr = items as T[] ?? items.ToArray();
var newCount = _count + arr.Length;
EnsureCapacity(newCount);
Array.Copy(arr, 0, _arr, _count, arr.Length);
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
private void EnsureCapacity(int capacity)
{
if (_arr.Length >= capacity)
return;
int doubled;
checked
{
try
{
doubled = _arr.Length * 2;
}
catch (OverflowException)
{
doubled = int.MaxValue;
}
}
var newLength = Math.Max(doubled, capacity);
Array.Resize(ref _arr, newLength);
}
public bool Remove(T item)
{
_lock.EnterUpgradeableReadLock();
try
{
var i = IndexOfInternal(item);
if (i == -1)
return false;
_lock.EnterWriteLock();
try
{
RemoveAtInternal(i);
return true;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
public IEnumerator<T> GetEnumerator()
{
_lock.EnterReadLock();
try
{
for (int i = 0; i < _count; i++)
// deadlocking potential mitigated by lock recursion enforcement
yield return _arr[i];
}
finally
{
_lock.ExitReadLock();
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
public int IndexOf(T item)
{
_lock.EnterReadLock();
try
{
return IndexOfInternal(item);
}
finally
{
_lock.ExitReadLock();
}
}
private int IndexOfInternal(T item)
{
return Array.FindIndex(_arr, 0, _count, x => x.Equals(item));
}
public void Insert(int index, T item)
{
_lock.EnterUpgradeableReadLock();
try
{
if (index > _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
var newCount = _count + 1;
EnsureCapacity(newCount);
// shift everything right by one, starting at index
Array.Copy(_arr, index, _arr, index + 1, _count - index);
// insert
_arr[index] = item;
_count = newCount;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
public void RemoveAt(int index)
{
_lock.EnterUpgradeableReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
RemoveAtInternal(index);
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
private void RemoveAtInternal(int index)
{
Array.Copy(_arr, index + 1, _arr, index, _count - index-1);
_count--;
// release last element
Array.Clear(_arr, _count, 1);
}
public void Clear()
{
_lock.EnterWriteLock();
try
{
Array.Clear(_arr, 0, _count);
_count = 0;
}
finally
{
_lock.ExitWriteLock();
}
}
public bool Contains(T item)
{
_lock.EnterReadLock();
try
{
return IndexOfInternal(item) != -1;
}
finally
{
_lock.ExitReadLock();
}
}
public void CopyTo(T[] array, int arrayIndex)
{
_lock.EnterReadLock();
try
{
if(_count > array.Length - arrayIndex)
throw new ArgumentException("Destination array was not long enough.");
Array.Copy(_arr, 0, array, arrayIndex, _count);
}
finally
{
_lock.ExitReadLock();
}
}
public bool IsReadOnly
{
get { return false; }
}
public T this[int index]
{
get
{
_lock.EnterReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
return _arr[index];
}
finally
{
_lock.ExitReadLock();
}
}
set
{
_lock.EnterUpgradeableReadLock();
try
{
if (index >= _count)
throw new ArgumentOutOfRangeException("index");
_lock.EnterWriteLock();
try
{
_arr[index] = value;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
}
public void DoSync(Action<ConcurrentList<T>> action)
{
GetSync(l =>
{
action(l);
return 0;
});
}
public TResult GetSync<TResult>(Func<ConcurrentList<T>,TResult> func)
{
_lock.EnterWriteLock();
try
{
return func(this);
}
finally
{
_lock.ExitWriteLock();
}
}
public void Dispose()
{
_lock.Dispose();
}
}
Remove
的try
块的开头或索引器(setter)的开头会发生什么? - JamesIList
语义的有效性非常有限。我写这段代码的时间可能是在我意识到这一点之前。我的经验与被接受答案的作者相同:我根据我对同步和IList<T>的了解尝试过,通过这样做我学到了一些东西。 - Ronnie Overby