.NET 4.0 中没有 ConcurrentList<T> 吗?

223

看到 .Net 4.0 中新增的 System.Collections.Concurrent 命名空间,我感到非常兴奋,这很棒!我见过ConcurrentDictionaryConcurrentQueueConcurrentStackConcurrentBagBlockingCollection

有一件事情似乎莫名其妙地缺失了,那就是一个 ConcurrentList<T>。难道我必须自己编写它(或从网络上获取 :))?

这里有什么明显的遗漏吗?


8
ConcurrentBag<T> 是一个线程安全的集合,可以在并发环境中添加和移除元素。它提供了一种无序的方式来存储对象,并可用于高性能应用程序的线程安全操作。 - Rodrigo Reis
8
@RodrigoReis,ConcurrentBag<T>是无序集合,而List<T>是有序的。 - Adam Calvet Bohl
5
在多线程环境中如何可能拥有一个有序的集合?按设计,您永远无法控制元素的顺序。 - Jeremy Holovacs
使用锁代替。 - Erik Bergstedt
dotnet源代码中有一个名为ThreadSafeList.cs的文件,它看起来很像下面的一些代码。它也使用了ReaderWriterLockSlim,我正在尝试弄清楚为什么要使用它而不是简单的lock(obj)? - colin lamarre
好的,我明白了。这是一个性能改进,因为读取锁定实际上不会锁定,除非有写入操作正在进行,如果您有多个线程读取相同的列表,则非常有用。 - colin lamarre
12个回答

183

我曾经尝试过实现线程安全的、无锁的可调整大小的数组,详见此处(另外还有在GitHub上)。但是我的实现存在一些问题,在此不再赘述。最重要的是,我学到了什么。

首先,你不可能得到一个完全的、无锁的、线程安全的IList<T>实现。特别是,随机插入和删除是行不通的,除非你也放弃O(1)的随机访问(也就是说,除非你“作弊”,只使用某种形式的链表,并让索引变得低效)。

我认为值得尝试的是一个线程安全的、有限的IList<T>子集:特别是,它允许一个Add操作,并提供根据索引的随机只读访问(但没有InsertRemoveAt等,也没有随机写访问)。

这是我的ConcurrentList<T>实现的目标。但是当我在多线程场景下测试它的性能时,发现List<T>的简单同步添加更快。基本上,向一个List<T>添加元素已经非常快了;其中涉及的计算步骤的复杂度微不足道(增加索引并将其赋值给数组中的一个元素,就这样)。你需要大量并发写操作才能看到任何锁争用,即使如此,每个写操作的平均性能仍然能够击败更昂贵、但无锁的ConcurrentList<T>的实现。

在列表的内部数组需要重新调整大小的相对罕见情况下,你需要支付一定的代价。因此,我最终得出结论:这是唯一一个需要添加功能的ConcurrentList<T>集合类型在性能方面保证低开销的场景,每次添加元素都有保障(与平均性能目标相反)。
这个类并不像你想象中那么有用。

59
如果你需要类似于“List<T>”的东西,使用旧式基于监视器的同步,那么BCL中隐藏着“SynchronizedCollection<T>”:http://msdn.microsoft.com/en-us/library/ms668265.aspx。 - LukeH
9
一个小提示:使用Capacity构造函数参数来尽可能避免重新调整大小的情况。 - H H
2
ConcurrentList 最大的优势场景是在列表中添加的活动不是很多,但有许多并发读取器的情况下。可以将读取器的开销减少到单个内存屏障(如果读取器不关心稍微过时的数据,甚至可以消除这个屏障)。 - supercat
2
@Kevin:构建一个ConcurrentList<T>,使读者无需任何锁定即可保证看到一致的状态,相对较小的开销就可以实现。当列表从32扩展到64时,保留大小为32的数组并创建一个新的大小为64的数组。在添加下一个32个项目时,将其放入新数组的32-63槽中,并将旧数组中的一个项目复制到新数组中。在添加第64个项目之前,读者将查找0-31项的大小为32的数组和32-63项的大小为64的数组。 - supercat
2
一旦添加了第64个项目,大小为32的数组仍然适用于获取0-31项,但读取器将不再需要使用它。他们可以使用大小为64的数组来处理所有0-63项,并使用大小为128的数组来处理64-127项。选择使用两个数组中的哪一个以及内存屏障(如果需要)的开销,将小于即使是最有效的读写锁的开销。写入应该使用锁(无锁可能是可行的,特别是如果不介意在每次插入时创建新对象实例,但锁应该是便宜的)。 - supercat
显示剩余12条评论

40

你会用ConcurrentList做什么?

在多线程环境下,随机访问容器的概念并不像它看起来的那样有用。这个说法

  if (i < MyConcurrentList.Count)  
      x = MyConcurrentList[i]; 

作为一个整体仍然不能保证线程安全。

不要创建ConcurrentList,尝试使用已有的解决方案。最常用的类是 ConcurrentBag 和尤其是 BlockingCollection。


@Henk - 我不明白你的论点。为什么你不会遇到与 if (!dict.ContainsKey(somekey)) dict[someKey] = someValue 相同类型的问题呢?使用 ConcurrentDictionary 时,这段代码是否也会存在线程安全问题? - dcp
1
@Alan:没有锁定列表就无法实现。既然您已经可以使用“Monitor”来完成这个任务,那么就没有必要使用并发列表了。 - Billy ONeal
6
@dcp - 是的,这个本质上是不支持多线程安全的。ConcurrentDictionary有一些特殊的方法,可以在一个原子操作中完成这个过程,比如AddOrUpdate、GetOrAdd、TryUpdate等。它们仍然有ContainsKey方法,因为有时你只想知道字典中是否存在该键,而不想修改它(类似于HashSet)。 - Zarat
3
ContainsKey本身是线程安全的,你的例子(不是ContainsKey!)只有在第一个决策已经过时的情况下才会进行第二个调用,从而产生了竞态条件。 - Zarat
2
Henk,我不同意。我认为有一个简单的场景,它可能非常有用。工作线程写入数据,UI线程读取并相应地更新界面。如果您想以排序方式添加项目,则需要随机访问写入。您还可以使用堆栈和数据视图,但是您将不得不维护两个集合:-(。 - Eric Ouellet
显示剩余7条评论

21

尊重提供的精彩答案,有时我只想要一个线程安全的IList。没有高级的功能或花哨的东西。性能在许多情况下很重要,但有时这并不是需要考虑的问题。是的,“TryGetValue”等方法总会存在挑战,但大多数情况下,我只希望得到一个可以枚举而无需担心将锁放在所有内容周围的东西。是的,有人可能会在我的实现中找到一些“错误”,导致死锁或其他问题(我想),但说实话:当涉及到多线程时,如果您没有正确编写代码,它就会去死锁了。基于此,我决定制作一个简单的ConcurrentList实现,以满足这些基本需求。

值得一提的是:我进行了一个简单的测试,向普通List和ConcurrentList添加了10,000,000个项,结果如下:

List 完成时间: 7793 毫秒。 Concurrent 完成时间: 8064 毫秒。

public class ConcurrentList<T> : IList<T>, IDisposable
{
    #region Fields
    private readonly List<T> _list;
    private readonly ReaderWriterLockSlim _lock;
    #endregion

    #region Constructors
    public ConcurrentList()
    {
        this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
        this._list = new List<T>();
    }

    public ConcurrentList(int capacity)
    {
        this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
        this._list = new List<T>(capacity);
    }

    public ConcurrentList(IEnumerable<T> items)
    {
        this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
        this._list = new List<T>(items);
    }
    #endregion

    #region Methods
    public void Add(T item)
    {
        try
        {
            this._lock.EnterWriteLock();
            this._list.Add(item);
        }
        finally
        {
            this._lock.ExitWriteLock();
        }
    }

    public void Insert(int index, T item)
    {
        try
        {
            this._lock.EnterWriteLock();
            this._list.Insert(index, item);
        }
        finally
        {
            this._lock.ExitWriteLock();
        }
    }

    public bool Remove(T item)
    {
        try
        {
            this._lock.EnterWriteLock();
            return this._list.Remove(item);
        }
        finally
        {
            this._lock.ExitWriteLock();
        }
    }

    public void RemoveAt(int index)
    {
        try
        {
            this._lock.EnterWriteLock();
            this._list.RemoveAt(index);
        }
        finally
        {
            this._lock.ExitWriteLock();
        }
    }

    public int IndexOf(T item)
    {
        try
        {
            this._lock.EnterReadLock();
            return this._list.IndexOf(item);
        }
        finally
        {
            this._lock.ExitReadLock();
        }
    }

    public void Clear()
    {
        try
        {
            this._lock.EnterWriteLock();
            this._list.Clear();
        }
        finally
        {
            this._lock.ExitWriteLock();
        }
    }

    public bool Contains(T item)
    {
        try
        {
            this._lock.EnterReadLock();
            return this._list.Contains(item);
        }
        finally
        {
            this._lock.ExitReadLock();
        }
    }

    public void CopyTo(T[] array, int arrayIndex)
    {
        try
        {
            this._lock.EnterReadLock();
            this._list.CopyTo(array, arrayIndex);
        }
        finally
        {
            this._lock.ExitReadLock();
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        return new ConcurrentEnumerator<T>(this._list, this._lock);
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return new ConcurrentEnumerator<T>(this._list, this._lock);
    }

    ~ConcurrentList()
    {
        this.Dispose(false);
    }

    public void Dispose()
    {
        this.Dispose(true);
    }

    private void Dispose(bool disposing)
    {
        if (disposing)
            GC.SuppressFinalize(this);

        this._lock.Dispose();
    }
    #endregion

    #region Properties
    public T this[int index]
    {
        get
        {
            try
            {
                this._lock.EnterReadLock();
                return this._list[index];
            }
            finally
            {
                this._lock.ExitReadLock();
            }
        }
        set
        {
            try
            {
                this._lock.EnterWriteLock();
                this._list[index] = value;
            }
            finally
            {
                this._lock.ExitWriteLock();
            }
        }
    }

    public int Count
    {
        get
        {
            try
            {
                this._lock.EnterReadLock();
                return this._list.Count;
            }
            finally
            {
                this._lock.ExitReadLock();
            }
        }
    }

    public bool IsReadOnly
    {
        get { return false; }
    }
    #endregion
}

    public class ConcurrentEnumerator<T> : IEnumerator<T>
{
    #region Fields
    private readonly IEnumerator<T> _inner;
    private readonly ReaderWriterLockSlim _lock;
    #endregion

    #region Constructor
    public ConcurrentEnumerator(IEnumerable<T> inner, ReaderWriterLockSlim @lock)
    {
        this._lock = @lock;
        this._lock.EnterReadLock();
        this._inner = inner.GetEnumerator();
    }
    #endregion

    #region Methods
    public bool MoveNext()
    {
        return _inner.MoveNext();
    }

    public void Reset()
    {
        _inner.Reset();
    }

    public void Dispose()
    {
        this._lock.ExitReadLock();
    }
    #endregion

    #region Properties
    public T Current
    {
        get { return _inner.Current; }
    }

    object IEnumerator.Current
    {
        get { return _inner.Current; }
    }
    #endregion
}

8
好的,虽然这个答案有点旧,但是它仍然适用:RemoveAt(int index) 方法在多线程环境下永远不安全。Insert(int index, T item) 方法仅在 index==0 时才是安全的。IndexOf() 方法返回的结果立即过时。更不要说使用 this[int] 了。 - H H
3
你不需要也不想要一个~Finalizer()。 - H H
2
你说你已经放弃了防止死锁的可能性 - 一个单独的ReaderWriterLockSlim可以通过同时使用EnterUpgradeableReadLock()轻松地造成死锁。但是你没有使用它,也没有将锁暴露给外部,并且你没有在持有读锁时调用进入写锁的方法,因此使用你的类不会使死锁更有可能发生。 - Evgeniy Berezovsky
1
非并发接口不适合并发访问。例如,以下代码不是原子性的 var l = new ConcurrentList<string>(); /* ... */ l[0] += "asdf";。一般来说,任何读写组合在并发执行时都可能导致严重问题。这就是为什么并发数据结构通常提供了相应的方法,例如 ConcurrentDictionaryAddOrGet 等。注意:您不断重复使用 this. 是多余的,因为下划线已经标记了成员。 - Evgeniy Berezovsky
1
谢谢Eugene。我是.NET Reflector的重度用户,它会在所有非静态字段上放置"this."。因此,我已经习惯了这种方式。关于这个非并发接口不合适的问题:您绝对正确,尝试对我的实现执行多个操作可能变得不可靠。但是这里的要求很简单,即可以执行单个操作(添加、删除、清除或枚举)而不破坏集合。这基本上消除了在所有内容周围放置锁语句的需要。 - Brian Booth

19

没有ConcurrentList的原因是它本质上无法编写。原因在于IList中的几个重要操作依赖于索引,而这根本行不通。例如:

int catIndex = list.IndexOf("cat");
list.Insert(catIndex, "dog");

作者试图实现的效果是在“猫”之前插入“狗”,但在多线程环境下,在这两行代码之间,列表可能会发生任何事情。例如,另一个线程可能会执行list.RemoveAt(0),将整个列表向左移动,但关键是catIndex不会改变。这里的影响是Insert操作实际上会将“狗”放在猫之后,而不是之前。

您看到作为对此问题的“答案”提供的几种实现都是出于善意,但正如上面所示,它们并不能提供可靠的结果。如果您确实想在多线程环境中使用类似于列表的语义,那么您无法通过在列表实现方法内部放置锁来实现。您必须确保您使用的任何索引完全存在于锁的上下文中。总之,您可以使用带有正确锁定的List在多线程环境中,但是列表本身不能存在于该世界中。

如果您认为需要并发列表,则实际上只有两种可能性:

  1. 您实际上需要ConcurrentBag
  2. 您需要创建自己的集合,可能使用List和自己的并发控制实现。

如果您有一个ConcurrentBag并且需要将其作为IList传递,则存在问题,因为您调用的方法已经指定它们可能会尝试执行上述猫和狗的操作。在大多数情况下,这意味着您正在调用的方法根本不适用于多线程环境。这意味着您要么对其进行重构以使其适用,要么如果无法这样做,则必须非常小心地处理它。您几乎肯定需要创建自己的带有自己锁的集合,并在锁内部调用有问题的方法。


12

ConcurrentList(作为可调整大小的数组,而不是链表)不容易使用非阻塞操作编写。它的 API 不太适合“并发”版本。

作为可调整大小的数组,ConcurrentList 在使用非阻塞操作进行编写并不简单。其 API 也不适合设计成“并发”版本。请注意保留 HTML 标记。

14
不仅写作困难,甚至很难想出有用的界面。 - CodesInChaos

6
读操作远多于写操作或(不论其频率)写操作非并发的情况下,写时复制方法可能是合适的选择。
下面展示的实现方式:
  • 无锁
  • 针对并发读取极快,即使进行并发修改 - 无论需要多长时间
  • 由于“快照”是不可变的,无需锁的原子性是可能的,即var snap = _list; snap[snap.Count - 1];永远不会(嗯,当然空列表除外)引发异常,您还可以获得带有快照语义的线程安全枚举,这是免费的..我有多么喜欢不可变性!
  • 通用实现,适用于任何数据结构任何类型的修改
  • 极其简单,易于测试、调试和通过代码阅读验证
  • 可在 .Net 3.5 中使用
为了让写时复制起作用,您必须使数据结构有效地不可变,即在将其提供给其他线程后,不允许任何人更改它们。当您想要修改时,
  1. 克隆该结构
  2. 对克隆进行修改
  3. 原子性地交换对修改后的克隆的引用
代码
static class CopyOnWriteSwapper
{
    public static void Swap<T>(ref T obj, Func<T, T> cloner, Action<T> op)
        where T : class
    {
        while (true)
        {
            var objBefore = Volatile.Read(ref obj);
            var newObj = cloner(objBefore);
            op(newObj);
            if (Interlocked.CompareExchange(ref obj, newObj, objBefore) == objBefore)
                return;
        }
    }
}

用法

CopyOnWriteSwapper.Swap(ref _myList,
    orig => new List<string>(orig),
    clone => clone.Add("asdf"));

如果您需要更高的性能,最好取消泛化方法,例如为每种修改类型(添加、删除等)创建一个方法,并硬编码函数指针clonerop
注意事项1:您有责任确保没有人修改(假定为)不可变的数据结构。在通用实现中,我们无法防止这种情况,但是当专门针对List<T>时,您可以使用List.AsReadOnly()来防范修改。
注意事项2:要注意列表中的值。上面的写入副本方法仅保护它们的列表成员身份,但如果您放入的不是字符串而是其他可变对象,则必须注意线程安全性(例如锁定)。但这与此解决方案无关,例如可轻松使用可变值的锁定而不会出现问题。您只需要注意这一点即可。

注意事项 #3 如果您的数据结构很大并且经常进行修改,使用全量复制的方法可能会在内存消耗和复制所涉及的CPU成本方面具有限制性。在这种情况下,您可能希望改用微软的Immutable Collections


5

System.Collections.Generic.List<t> 对于多个读取器已经是线程安全的。尝试使其对于多个写入器线程安全是没有意义的(正如 Henk 和 Stephen 已经提到的原因)。


你无法想象我可能有5个线程添加到一个列表的情况吗?这样,即使它们全部终止之前,你也可以看到列表累积记录。 - Alan
10
@Alan - 那将是ConcurrentQueue、ConcurrentStack或ConcurrentBag。如果要理解ConcurrentList,您应该提供一个可用类不足的用例。我不明白为什么我会希望在索引访问时,元素可以通过并发删除随机更改索引位置。对于“锁定”读取,您已经可以对现有的并发类进行快照,并将它们放入列表中。 - Zarat
3
我认为你错了。说“适用于多个读者”的意思并不是说你不能同时写入。写入也可能意味着删除,如果你在迭代中删除,就会收到错误提示。 - Eric Ouellet
@Eric:我从未说过你可以同时编写。你只能有一个编写者,或者可以有多个读取者,但不能同时存在。 - Billy ONeal
@Eugene:这正是我的答案所说的。我知道问题要求这样做,但问题的根本要求毫无意义。 - Billy ONeal
显示剩余3条评论

2

有些人提出了一些好的观点(以及我的一些想法):

  • 禁用随机访问器(索引器)可能看起来有些疯狂,但对我来说似乎还好。你只需要考虑到多线程集合中有许多方法可能会失败,如Indexer和Delete。您还可以为写访问器定义失败(回退)操作,例如“失败”或简单地“添加到末尾”。
  • 它并不是因为它是一个多线程集合就一定会在多线程环境中使用。或者它也可以被一个作者和一个读者使用。
  • 另一种安全使用索引器的方法是将操作包装到集合的锁定中,使用其根(如果公开)。
  • 对于许多人来说,使rootLock可见违反了“良好实践”。我对这一点不是100%确定,因为如果它被隐藏,您将为用户删除很多灵活性。我们始终要记住,编写多线程程序不适合所有人。我们无法防止每种错误的用法。
  • Microsoft将不得不做一些工作,并定义一些新标准,以介绍正确使用多线程集合的方法。首先,IEnumerator不应该有moveNext,而应该有一个GetNext,返回true或false,并获得类型为T的out参数(这样迭代将不再阻塞)。此外,Microsoft已经在foreach中内部使用“using”,但有时直接使用IEnumerator而没有用“using”进行包装(这是集合视图中的错误,可能还有更多地方)-包装IEnumerator的使用是Microsoft建议的做法。这个错误消除了安全迭代器的潜力...迭代器在构造函数中锁定集合,并在其Dispose方法中解锁-用于阻塞foreach方法。

这不是一个答案。这只是一些评论,不太适合特定的位置。

......我的结论是,微软必须对“foreach”进行一些深刻的改变,以使多线程集合更容易使用。此外,它必须遵循自己的IEnumerator使用规则。在那之前,我们可以轻松编写一个MultiThreadList,它将使用阻塞迭代器,但不会遵循“IList”。相反,您将不得不定义自己的“IListPersonnal”接口,在“插入”,“删除”和随机访问器(索引器)失败时可能会出现异常。但如果它不是标准,谁会想使用它呢?


一个人可以很容易地编写一个 ConcurrentOrderedBag<T>,其中包括一个只读的 IList<T> 实现,但也提供了一个完全线程安全的 int Add(T value) 方法。 我不明白为什么需要任何 ForEach 更改。尽管微软没有明确说明,但他们的做法表明,如果枚举器无法保证无故障操作,则只有在枚举器枚举创建时存在的集合内容时才需要集合修改异常。 - supercat
遍历MT集合时,它的设计方式可能会导致异常,就像你说的那样...我不知道是哪一个异常。你会捕获所有异常吗?在我的书中,异常就是异常,不应该在代码的正常执行中发生。否则,为了防止异常,你必须锁定集合或获取副本(以安全的方式-即锁定),或者在集合中实现非常复杂的机制,以防止由于并发而发生异常。我的想法是添加一个IEnumeratorMT,在for each发生时锁定集合并添加相关代码,这将很好。 - Eric Ouellet
最有帮助的是如果 IEnumerable<T> 包含一个返回类型为 IEnumerable<T> 的“快照”方法。不可变集合可以返回自身;有界集合可以将自己复制到 List<T>T[] 中,并在其上调用 GetEnumerator。一些无界集合可以实现 Snapshot,而那些不能的集合则可以在不尝试填充其内容列表的情况下抛出异常。 - supercat
我基本上同意。快照会非常好。而在“GetEnumerator”上加锁很容易导致你所说的问题,因为人们不会预料到这种行为。我不是说我有完美的解决方案,但是没有锁的迭代器是危险的,实现应该肯定要防止这种情况发生。快照的问题是对于大型集合来说性能较差。我认为理想情况下,MT集合应该根据其所需的用途(主要用途)以多种方式实现。使用快照(就像你所说的那样),并且还可以使用允许在迭代时修改的机制(这会减慢插入/删除速度)。 - Eric Ouellet
感谢supercat的评论,我很感激。 - Eric Ouellet
显示剩余6条评论

1

我实现了一个类似于Brian's的东西。但我的不同之处在于:

  • 我直接管理数组。
  • 我不在try块内输入锁。
  • 我使用yield return来生成枚举器。
  • 我支持锁递归。这允许在迭代期间从列表中读取。
  • 我尽可能使用可升级的读锁。
  • DoSyncGetSync方法允许顺序交互,需要对列表进行独占访问。

代码

public class ConcurrentList<T> : IList<T>, IDisposable
{
    private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
    private int _count = 0;

    public int Count
    {
        get
        { 
            _lock.EnterReadLock();
            try
            {           
                return _count;
            }
            finally
            {
                _lock.ExitReadLock();
            }
        }
    }

    public int InternalArrayLength
    { 
        get
        { 
            _lock.EnterReadLock();
            try
            {           
                return _arr.Length;
            }
            finally
            {
                _lock.ExitReadLock();
            }
        }
    }

    private T[] _arr;

    public ConcurrentList(int initialCapacity)
    {
        _arr = new T[initialCapacity];
    }

    public ConcurrentList():this(4)
    { }

    public ConcurrentList(IEnumerable<T> items)
    {
        _arr = items.ToArray();
        _count = _arr.Length;
    }

    public void Add(T item)
    {
        _lock.EnterWriteLock();
        try
        {       
            var newCount = _count + 1;          
            EnsureCapacity(newCount);           
            _arr[_count] = item;
            _count = newCount;                  
        }
        finally
        {
            _lock.ExitWriteLock();
        }       
    }

    public void AddRange(IEnumerable<T> items)
    {
        if (items == null)
            throw new ArgumentNullException("items");

        _lock.EnterWriteLock();

        try
        {           
            var arr = items as T[] ?? items.ToArray();          
            var newCount = _count + arr.Length;
            EnsureCapacity(newCount);           
            Array.Copy(arr, 0, _arr, _count, arr.Length);       
            _count = newCount;
        }
        finally
        {
            _lock.ExitWriteLock();          
        }
    }

    private void EnsureCapacity(int capacity)
    {   
        if (_arr.Length >= capacity)
            return;

        int doubled;
        checked
        {
            try
            {           
                doubled = _arr.Length * 2;
            }
            catch (OverflowException)
            {
                doubled = int.MaxValue;
            }
        }

        var newLength = Math.Max(doubled, capacity);            
        Array.Resize(ref _arr, newLength);
    }

    public bool Remove(T item)
    {
        _lock.EnterUpgradeableReadLock();

        try
        {           
            var i = IndexOfInternal(item);

            if (i == -1)
                return false;

            _lock.EnterWriteLock();
            try
            {   
                RemoveAtInternal(i);
                return true;
            }
            finally
            {               
                _lock.ExitWriteLock();
            }
        }
        finally
        {           
            _lock.ExitUpgradeableReadLock();
        }
    }

    public IEnumerator<T> GetEnumerator()
    {
        _lock.EnterReadLock();

        try
        {    
            for (int i = 0; i < _count; i++)
                // deadlocking potential mitigated by lock recursion enforcement
                yield return _arr[i]; 
        }
        finally
        {           
            _lock.ExitReadLock();
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }

    public int IndexOf(T item)
    {
        _lock.EnterReadLock();
        try
        {   
            return IndexOfInternal(item);
        }
        finally
        {
            _lock.ExitReadLock();
        }
    }

    private int IndexOfInternal(T item)
    {
        return Array.FindIndex(_arr, 0, _count, x => x.Equals(item));
    }

    public void Insert(int index, T item)
    {
        _lock.EnterUpgradeableReadLock();

        try
        {                       
            if (index > _count)
                throw new ArgumentOutOfRangeException("index"); 

            _lock.EnterWriteLock();
            try
            {       
                var newCount = _count + 1;
                EnsureCapacity(newCount);

                // shift everything right by one, starting at index
                Array.Copy(_arr, index, _arr, index + 1, _count - index);

                // insert
                _arr[index] = item;     
                _count = newCount;
            }
            finally
            {           
                _lock.ExitWriteLock();
            }
        }
        finally
        {
            _lock.ExitUpgradeableReadLock();            
        }


    }

    public void RemoveAt(int index)
    {   
        _lock.EnterUpgradeableReadLock();
        try
        {   
            if (index >= _count)
                throw new ArgumentOutOfRangeException("index");

            _lock.EnterWriteLock();
            try
            {           
                RemoveAtInternal(index);
            }
            finally
            {
                _lock.ExitWriteLock();
            }
        }
        finally
        {
            _lock.ExitUpgradeableReadLock();            
        }
    }

    private void RemoveAtInternal(int index)
    {           
        Array.Copy(_arr, index + 1, _arr, index, _count - index-1);
        _count--;

        // release last element
        Array.Clear(_arr, _count, 1);
    }

    public void Clear()
    {
        _lock.EnterWriteLock();
        try
        {        
            Array.Clear(_arr, 0, _count);
            _count = 0;
        }
        finally
        {           
            _lock.ExitWriteLock();
        }   
    }

    public bool Contains(T item)
    {
        _lock.EnterReadLock();
        try
        {   
            return IndexOfInternal(item) != -1;
        }
        finally
        {           
            _lock.ExitReadLock();
        }
    }

    public void CopyTo(T[] array, int arrayIndex)
    {       
        _lock.EnterReadLock();
        try
        {           
            if(_count > array.Length - arrayIndex)
                throw new ArgumentException("Destination array was not long enough.");

            Array.Copy(_arr, 0, array, arrayIndex, _count);
        }
        finally
        {
            _lock.ExitReadLock();           
        }
    }

    public bool IsReadOnly
    {   
        get { return false; }
    }

    public T this[int index]
    {
        get
        {
            _lock.EnterReadLock();
            try
            {           
                if (index >= _count)
                    throw new ArgumentOutOfRangeException("index");

                return _arr[index]; 
            }
            finally
            {
                _lock.ExitReadLock();               
            }           
        }
        set
        {
            _lock.EnterUpgradeableReadLock();
            try
            {

                if (index >= _count)
                    throw new ArgumentOutOfRangeException("index");

                _lock.EnterWriteLock();
                try
                {                       
                    _arr[index] = value;
                }
                finally
                {
                    _lock.ExitWriteLock();              
                }
            }
            finally
            {
                _lock.ExitUpgradeableReadLock();
            }

        }
    }

    public void DoSync(Action<ConcurrentList<T>> action)
    {
        GetSync(l =>
        {
            action(l);
            return 0;
        });
    }

    public TResult GetSync<TResult>(Func<ConcurrentList<T>,TResult> func)
    {
        _lock.EnterWriteLock();
        try
        {           
            return func(this);
        }
        finally
        {
            _lock.ExitWriteLock();
        }
    }

    public void Dispose()
    {   
        _lock.Dispose();
    }
}

如果两个线程同时进入Removetry块的开头或索引器(setter)的开头会发生什么? - James
@James,那似乎不可能。请阅读https://msdn.microsoft.com/en-us/library/system.threading.readerwriterlockslim.enterupgradeablereadlock%28v=vs.110%29.aspx中的注释。运行此代码后,您将永远不会第二次进入该锁:https://gist.github.com/ronnieoverby/59b715c3676127a113c3 - Ronnie Overby
@Ronny Overby:有趣。鉴于此,我怀疑如果您从所有函数中删除UpgradableReadLock,在可升级读锁和写锁之间执行的唯一操作是检查参数是否超出范围,则性能会更好-任何类型的锁的开销都比检查参数是否超出范围要大得多,因此在写锁内部执行该检查可能会更好。 - James
这个类似乎也不是很有用,因为基于偏移量的函数(其中大部分)实际上无法安全地使用,除非有一些外部锁定机制,因为在您决定放置或获取某些内容的位置和实际获取它之间,集合可能会发生变化。 - James
@James 所以不要使用它。 - Ronnie Overby
1
我想说一下,我认识到在并发场景中使用IList语义的有效性非常有限。我写这段代码的时间可能是在我意识到这一点之前。我的经验与被接受答案的作者相同:我根据我对同步和IList<T>的了解尝试过,通过这样做我学到了一些东西。 - Ronnie Overby

1
在顺序执行代码中,使用的数据结构与(良好编写的)并发执行代码不同。原因是顺序代码暗示了隐含的顺序。然而,并发代码不暗示任何顺序;更好的是,它暗示了缺乏任何定义的顺序!
由于这个原因,具有暗示顺序的数据结构(如列表)对于解决并发问题并不是非常有用。列表暗示了顺序,但它没有清楚地定义那个顺序。因此,操作列表的代码的执行顺序将在一定程度上决定列表的隐含顺序,这与高效的并发解决方案直接冲突。
请记住,并发是一个数据问题,而不是代码问题!您不能先实现代码(或重写现有的顺序代码),然后获得一个设计良好的并发解决方案。您需要首先设计数据结构,同时记住在并发系统中不存在隐含顺序。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接