使“在枚举时修改”集合线程安全

6
我想创建一个线程安全的集合,即使在枚举时也可以修改。
样例代码中的ActionSet类存储Action处理程序。它有Add方法来将新处理程序添加到列表中,还有Invoke方法来枚举并调用收集的所有Action处理程序。预期的工作场景包括频繁枚举和偶尔在枚举过程中修改。
普通集合如果您在枚举尚未结束时使用Add方法进行修改,则会抛出异常。
解决问题的一种简单但慢速的方法是:在枚举之前克隆集合。
class ThreadSafeSlowActionSet {
    List<Action> _actions = new List<Action>();

    public void Add(Action action) {
        lock(_actions) {
            _actions.Add(action);
        }
    }

    public void Invoke() {
        lock(_actions) {
            List<Action> actionsClone = _actions.ToList();
        }
        foreach (var action in actionsClone ) {
            action();
        }
    }
}

这个解决方案的问题在于枚举开销,我希望枚举非常快。
我创建了一个相当快速的“递归安全”集合,即使在枚举时也允许添加新值。如果您在主要的 _actions 集合被枚举时添加新值,则这些值将添加到临时的 _delta 集合而不是主要集合中。完成所有枚举后, _delta 值将添加到 _actions 集合中。如果您在主要的 _actions 集合被枚举时添加了一些新值(创建了 _delta 集合),然后再次进入Invoke方法,我们必须创建一个新的合并集合( _actions + _delta )并用其替换 _actions
因此,这个集合看起来是“递归安全”的,但我想使它成为线程安全的。我认为我需要使用 Interlocked.* 结构、 System.Threading 类和其他同步原语来使这个集合线程安全,但我没有好的想法如何做到这一点。
如何使这个集合线程安全?
class RecursionSafeFastActionSet {
    List<Action> _actions = new List<Action>(); //The main store
    List<Action> _delta; //Temporary buffer for storing added values while the main store is being enumerated
    int _lock = 0; //The number of concurrent Invoke enumerations

    public void Add(Action action) {
        if (_lock == 0) { //_actions list is not being enumerated and can be modified
            _actions.Add(action);
        } else { //_actions list is being enumerated and cannot be modified
            if (_delta == null) {
                _delta = new List<Action>();
            }
            _delta.Add(action); //Storing the new values in the _delta buffer
        }
    }

    public void Invoke() {
        if (_delta != null) { //Re-entering Invoke after calling Add:  Invoke->Add,Invoke
            Debug.Assert(_lock > 0);
            var newActions = new List<Action>(_actions); //Creating a new list for merging delta
            newActions.AddRange(_delta); //Merging the delta
            _delta = null;
            _actions = newActions; //Replacing the original list (which is still being iterated)
        }
        _lock++;
        foreach (var action in _actions) {
            action();
        }
        _lock--;
        if (_lock == 0 && _delta != null) {
            _actions.AddRange(_delta); //Merging the delta
            _delta = null;
        }
    }
}

更新:新增了ThreadSafeSlowActionSet变体。

需要确认规范。如果在“Invoke”期间调用“Add”,则添加的“action”应在该轮“Invoke”中执行,稍后执行或未指定(即任何一种都可以)。 - tia
我们能保证所有后续的 Invoke 调用会调用之前添加的所有操作吗? - tia
你是否已经查看了并发命名空间 http://msdn.microsoft.com/en-us/library/system.collections.concurrent.aspx? - Prabhu Murthy
@tia 规范非常简单。想象一下,每次调用Invoke时都会复制列表,并对复制进行枚举。ActionSet应该具有相同的行为(不同之处在于性能会更好,因为列表不会在每次调用中复制 - 仅在Invoke->Add,Invoke情况下才会复制)。所以,1)添加的action只会在下一个Invoke中执行。2)我认为后续的Invoke(并发或非并发)应该调用之前添加的所有操作。 - Ark-kun
@JonRea 谢谢。我知道 Interlocked.Increment 的用法。但是它在这里并不能帮助我。因为还需要进行其他检查等操作,要使这段代码变得线程安全需要更多的工作。 - Ark-kun
显示剩余4条评论
3个回答

3

一种更简单的方法(例如,ConcurrentBag中使用的方法)是使GetEnumerator()返回一个枚举器,该枚举器遍历集合内容的快照。在您的情况下,代码可能如下所示:

public IEnumerator<Action> GetEnumerator()
{
    lock(sync)
    {
        return _actions.ToList().GetEnumerator();
    }
}

如果您这样做,就不需要使用 _delta 字段及其增加的复杂性。

对于OP经常枚举的问题,您可能希望以线程安全的方式缓存ToList()结果,并在任何Add/Remove操作时使缓存失效。这会略微复杂,但仍然优于起始点。 - Damien_The_Unbeliever
@Damien_The_Unbeliever,你的变体肯定比@Joe的变体更好(我应该从一开始就将其添加到问题中作为“ThreadSafeSlowActionSet”)。尽管如此,修改仍然相当常见,而列表克隆将降低性能(最重要的是它会在WP7上引起频繁的GC-caused滞后)。 - Ark-kun

1

这是您的课程修改后的线程安全版本:

class SafeActionSet
{
    Object _sync = new Object();
    List<Action> _actions = new List<Action>(); //The main store
    List<Action> _delta = new List<Action>();   //Temporary buffer for storing added values while the main store is being enumerated
    int _lock = 0; //The number of concurrent Invoke enumerations

    public void Add(Action action)
    {
        lock(sync)
        {
            if (0 == _lock)
            { //_actions list is not being enumerated and can be modified
                _actions.Add(action);
            }
            else
            { //_actions list is being enumerated and cannot be modified
                _delta.Add(action); //Storing the new values in the _delta buffer
            }
        }
    }

    public void Invoke()
    {
        lock(sync)
        {
            if (0 < _delta.Count)
            { //Re-entering Invoke after calling Add:  Invoke->Add,Invoke
                Debug.Assert(0 < _lock);
                var newActions = new List<Action>(_actions); //Creating a new list for merging delta
                newActions.AddRange(_delta); //Merging the delta
                _delta.Clear();
                _actions = newActions; //Replacing the original list (which is still being iterated)
            }
            ++_lock;
        }
        foreach (var action in _actions)
        {
            action();
        }
        lock(sync)
        {
            --_lock;
            if ((0 == _lock) && (0 < _delta.Count))
            {
                _actions.AddRange(_delta); //Merging the delta
                _delta.Clear();
            }
        }
    }
}

我进行了一些其他的调整,原因如下:

  • 将IF表达式反转,使得常量值在前面,这样如果我打错字,把“=”写成“==”或“!=”等等,编译器会立即告诉我错误。这是我养成的习惯,因为我的大脑和手指经常不同步。
  • 预分配_delta,并调用.Clear()而不是将其设置为null,因为我发现这样更容易阅读。
  • 各种lock(_sync) {...}可以确保你在访问所有实例变量时都是线程安全的。:( 除了枚举本身中对 _action 的访问之外。):

if 条件语句中没有必要把常量放在前面(也称为 Yoda conditions),如果你写成 = 而不是 ==,除非变量是 Boolean 类型,否则代码编译时会失败,并且编译器将发出警告。 - Allon Guralnek
啊,尤达条件! :) http://www.codinghorror.com/blog/2012/07/new-programming-jargon.html - Matěj Zábský
抱歉我走神了。一开始看这个解决方案时,它似乎不是死锁/异常安全的,所以我基于ReaderWriterLockSlim创建了一个可怕的200行实现 =)。现在我看到这个解决方案真的是线程安全的。我怎么没看出来呢... - Ark-kun
@AllonGuralnek - 实际上在“C”语言中,如果x是一个int变量,if (x = 5) ...可以顺利编译,并且始终为真。而if (x = 0) ...可以正常编译,但始终为假。你说得对,在“C++”中,x必须是一个Boolean才能应用于if (x = someExpression ) ...。将一个常量表达式放在if语句中可能会得到警告,但错误会强制我进行修复,而警告可能会被忽略。是的,我知道使用编译器来改善我的坏习惯本身就是个坏习惯。;-) - Jesse Chisholm

0

由于我实际上也需要从集合中删除项目,所以我最终使用的实现是基于重写的LinkedList,它在删除/插入时锁定相邻节点,并且在枚举期间如果集合被更改不会抱怨。 我还添加了一个字典,使元素搜索更快。


你只锁定相邻的节点吗?你考虑过死锁问题并检查它是否比每个列表锁更快吗? - Eamon Nerbonne
我正在锁定3个节点(前一个、当前和下一个),并在循环中检查,直到“previous.Next == current”。这个顺序应该避免死锁。 - Ark-kun
但是你说得对。我忘记了我实际上已经抛弃了那段代码。我想创建一个通用的线程安全的单/双向链表,然后将其包装在一个带有Dictionary快速访问的类中。然后我注意到我仍然需要锁定整个结构,因为我必须锁定Dictionary。所以我从链表类中删除了所有锁和通用功能,并开始仅锁定主要的基于Dictionary的部分。 - Ark-kun

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接