.NET Framework中的并发HashSet<T>?

217

我有以下的类。

class Test{
    public HashSet<string> Data = new HashSet<string>();
}

我需要从不同的线程更改“Data”字段,因此我希望获得有关我的当前线程安全实现的一些意见。

class Test{
    public HashSet<string> Data = new HashSet<string>();

    public void Add(string Val){
            lock(Data) Data.Add(Val);
    }

    public void Remove(string Val){
            lock(Data) Data.Remove(Val);
    }
}

有没有更好的解决方案,能够直接访问字段并防止多个线程同时访问?


13
当然,让它保密。 - Hans Passant
3
从并发性的角度来看,除了 Data 字段是公共的之外,我没有发现你做得有什么问题!如果需要考虑读取性能,则可以使用 ReaderWriterLockSlim 来获得更好的读取性能。http://msdn.microsoft.com/zh-cn/library/system.threading.readerwriterlockslim.aspx - Allan Elder
@AllanElder 当有多个读取者和单个编写者时, ReaderWriterLock 将非常有用(高效)。我们需要知道是否对 OP 适用此情况。 - Sriram Sakthivel
5
当前实现并非真正的“并发”,它只是线程安全的。 - undefined
可能是如何在.Net中实现ConcurrentHashSet的重复问题。 - Ruben Bartelink
显示剩余2条评论
8个回答

241

您的实现是正确的。不幸的是,.NET Framework没有提供内置的并发哈希集类型,但是有一些解决方法。

ConcurrentDictionary(推荐)

第一个解决方法是使用命名空间为 System.Collections.Concurrent 的类ConcurrentDictionary<TKey, TValue>。在这种情况下,值是无意义的,因此我们可以使用简单的 byte(1个字节在内存中)。

private ConcurrentDictionary<string, byte> _data;

这是推荐的选项,因为该类型是线程安全的,并且提供了与 HashSet<T> 相同的优点,除了键和值是不同的对象。
来源:Social MSDN 自我实现
最后,您可以像您所做的那样,使用锁或其他.NET提供的方式来实现自己的数据类型以实现线程安全。这里有一个很好的例子:如何在.Net中实现ConcurrentHashSet 这种解决方案唯一的缺点是,即使是读取操作,类型HashSet<T>也没有官方的并发访问支持。
我引用链接帖子的代码(最初由Ben Mosher编写)。
using System;
using System.Collections.Generic;
using System.Threading;

namespace BlahBlah.Utilities
{
    public class ConcurrentHashSet<T> : IDisposable
    {
        private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
        private readonly HashSet<T> _hashSet = new HashSet<T>();

        #region Implementation of ICollection<T> ...ish
        public bool Add(T item)
        {
            _lock.EnterWriteLock();
            try
            {
                return _hashSet.Add(item);
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }

        public void Clear()
        {
            _lock.EnterWriteLock();
            try
            {
                _hashSet.Clear();
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }

        public bool Contains(T item)
        {
            _lock.EnterReadLock();
            try
            {
                return _hashSet.Contains(item);
            }
            finally
            {
                if (_lock.IsReadLockHeld) _lock.ExitReadLock();
            }
        }

        public bool Remove(T item)
        {
            _lock.EnterWriteLock();
            try
            {
                return _hashSet.Remove(item);
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }

        public int Count
        {
            get
            {
                _lock.EnterReadLock();
                try
                {
                    return _hashSet.Count;
                }
                finally
                {
                    if (_lock.IsReadLockHeld) _lock.ExitReadLock();
                }
            }
        }
        #endregion

        #region Dispose
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
        protected virtual void Dispose(bool disposing)
        {
            if (disposing)
                if (_lock != null)
                    _lock.Dispose();
        }
        ~ConcurrentHashSet()
        {
            Dispose(false);
        }
        #endregion
    }
}

编辑: 将进入锁方法移至try块外部, 因为它们可能会抛出异常并执行finally块中包含的指令。

ConcurrentBag (不建议使用)

不建议使用ConcurrentBag<T>,因为该类型只允许以线程安全的方式插入一个给定元素并从集合中删除一个随机元素。这个类是设计用于促进生产者和消费者场景,这不是 OP 所追求的(更多解释请参见此处)。

其他操作(例如扩展方法提供的)不支持并发使用。MSDN 文档警告: "ConcurrentBag 的所有公共和受保护成员都是线程安全的,并且可以从多个线程同时使用。但是,通过 ConcurrentBag 实现的接口之一访问的成员(包括扩展方法)不保证是线程安全的,可能需要由调用方同步。"


9
一本含有垃圾值的字典是一个列表。 - Ralf
65
好的,它是一个集合,而不是列表,因为它是无序的。 - Servy
17
根据 MSDN 对于“集合和同步(线程安全)”的相对简短的文档,System.Collections 和相关命名空间中的类可以被多个线程安全地读取。这意味着 HashSet 可以被多个线程安全地读取。 - Hank Schultz
9
@Oliver,即使是null引用(在32位运行时需要4个字节,在64位运行时需要8个字节),参考文献每个条目使用的内存也更多。因此,使用byte、空结构体或类似的东西可能会减少内存占用(如果运行时将数据对齐到本机内存边界以提高访问速度,则可能不会减少)。 - Lucero
13
自我实现不是ConcurrentHashSet,而是ThreadSafeHashSet。这两者之间有很大的区别,这就是为什么微软放弃了SynchronizedCollections(人们搞错了)。为了实现“Concurrent”,需要实现GetOrAdd等操作(类似于字典),否则在没有额外锁定的情况下无法确保并发性。但如果您需要在类外部进行额外的锁定,那么为什么不从一开始就使用一个简单的HashSet呢? - George Mavritsakis
显示剩余23条评论

60

我创造了一个真正的基于ConcurrentDictionaryConcurrentHashSet,而不是包装一个ConcurrentDictionary或锁定一个HashSet

在我的实现中,每个项目支持基本操作,但没有HashSet的集合操作,因为在并发场景下它们的意义较小(在我看来):

var concurrentHashSet = new ConcurrentHashSet<string>(
    new[]
    {
        "hamster",
        "HAMster",
        "bar",
    },
    StringComparer.OrdinalIgnoreCase);

concurrentHashSet.TryRemove("foo");

if (concurrentHashSet.Contains("BAR"))
{
    Console.WriteLine(concurrentHashSet.Count);
}

输出:2

你可以从NuGet 这里获取它,并在GitHub上查看源代码这里


3
这应该是被接受的答案,实现得很好。 - smirkingman
应该将 Add 重命名为 TryAdd,以使其与 ConcurrentDictionary 保持一致吗? - Neo
10
@Neo 不是因为意图使用 HashSet<T> 的语义,即调用 Add 并返回一个布尔值,指示是否已添加该项(true),还是它已经存在(false)。https://msdn.microsoft.com/zh-cn/library/bb353005(v=vs.110).aspx - G-Mac
它不应该实现ISet<T>接口并且与HashSet<T>的语义相匹配吗? - Jakoss
2
@Nekromancer,正如我在答案中所说,我认为在并发实现中提供这些set方法是没有意义的。例如,“Overlaps”需要在其运行期间锁定该实例,否则提供的答案可能已经错误。在我看来,这两种选择都不好(并且可以由消费者在外部添加)。 - i3arnon
一个可能缺失的方法(虽然很少需要)是HashSet.TryGetValue方法。这里有一个类似功能被请求的问题。 - Theodor Zoulias

30

由于没有其他人提到,我将提供一种可能适合您特定用途的替代方法:

Microsoft Immutable Collections

根据微软团队的 博客文章:

虽然创建并发应用程序比以往任何时候都更容易,但其中一个根本问题仍然存在: 可变共享状态。从多个线程读取通常非常容易,但一旦状态需要更新,它就会变得更加困难,特别是在需要锁定的设计中。

与锁定相对的方案是利用不可变状态。不可变数据结构保证永远不会改变,并且因此可以在不担心干扰他人的情况下自由地在不同的线程之间传递。

这种设计引起了一个新问题:如何管理状态中的更改而又无需每次复制整个状态?当涉及集合时,这尤其棘手。

这就是不可变集合的作用所在。

这些集合包括 ImmutableHashSet<T>ImmutableList<T>

性能

由于不可变集合使用树型数据结构来实现结构共享,它们的性能特征与可变集合不同。将其与锁定的可变集合进行比较时,结果将取决于锁争用和访问模式。但是,从另一篇有关不可变集合的 博客文章中可以得知:

问:我听说不可变集合很慢。这些集合有什么不同?在重视性能或内存时可以使用吗?

A: 这些不可变集合已经被高度调整,以在平衡内存共享时拥有与可变集合相当的性能特征。在某些情况下,它们在算法和实际时间上几乎和可变集合一样快,有时甚至更快,但在其他情况下,它们的算法更加复杂。然而,在许多情况下,差异将是可以忽略的。通常应该使用最简单的代码完成工作,然后根据需要进行性能调优。当考虑线程安全时,不可变集合有助于编写简单的代码。

换句话说,在许多情况下,差异是不会明显的,您应该选择更简单的选项——对于并发集合,这将是使用ImmutableHashSet<T>,因为您没有现有的锁定可变实现! :-)


5
如果您的意图是从多个线程更新共享状态,那么 ImmutableHashSet<T> 并没有什么帮助,或者我理解的有误吗? - tugberk
11
@tugberk 是和不是。因为集合是不可变的,所以你必须更新对它的引用,而集合本身并不能帮助你完成这个任务。好消息是,你将一个多线程共享数据结构的复杂问题简化为了更简单的共享引用的问题。该库为你提供了 ImmutableInterlocked.Update 方法来帮助你解决这个问题。 - Søren Boisen
2
@SørenBoisen刚刚了解了不可变集合,并尝试弄清如何在线程安全的情况下使用它们。ImmutableInterlocked.Update似乎是缺失的关键。谢谢! - xneg

4
使一个ISet<T>并发的复杂之处在于集合方法(并集、交集、差集)具有迭代的特性。至少你必须迭代涉及到操作的其中一个集合的所有n个成员,同时锁定这两个集合。
当你必须在迭代过程中锁定整个集合时,就会失去使用ConcurrentDictionary<T,byte>的优势。如果没有锁定,这些操作就不是线程安全的。
考虑到ConcurrentDictionary<T,byte>增加的开销,最好只使用更轻量的HashSet<T> ,并将所有内容都包含在锁内。
如果不需要集合操作,则可以使用ConcurrentDictionary<T,byte>,并在添加键时使用默认值default(byte)

3
基于 ConcurrentDictionary<TKey, TValue> 的解决方案通常具有良好的可扩展性;但是,如果您需要访问 CountKeysValues 属性,或者遍历这些项,它的性能会比单个锁定集合更差。这是因为 ConcurrentDictionary 使用一组锁(默认情况下,它们的数量取决于 CPU 核心数),并且访问这些成员会导致获取所有锁定,因此它的性能越差,您的 CPU 拥有的核心越多。
另一个答案建议使用不可变集合。虽然它们是线程安全的,但仅当您很少向其中添加新项时才表现良好(这总是创建一个新实例,尽管尝试从先前的实例继承尽可能多的节点),但即使在这种情况下,它们通常也有较差的性能。
我最终选择了另一种解决方案(后来我也将其应用于我的ThreadSafeHashSet<T>):与 ConcurrentDictionary 相反,我只使用一个锁,但仅是临时的:不时将新项移动到完全无锁存储中,即使删除并重新添加相同的键也变得无锁,因此非常快。不使用定时器执行这些合并。仅当必须访问锁定存储并且它在“足够长”的时间之前创建时,才会将合并到无锁存储中,这是可配置的

注意:ThreadSafeDictionary<TKey TValue> 类的 备注 部分查看性能比较表(它具有与 ThreadSafeHashSet<T> 相同的特征),以了解是否适合您的需求。如果要自行执行性能测试,则可以在此处查找性能测试的源代码。

源代码可以在这里获取,您也可以下载一个NuGet包


2
我希望使用完整的解决方案,因此我做了这个:请注意,我的计数是以不同的方式实现的,因为我不明白为什么在尝试计算其值时应禁止读取hashset。
@Zen,感谢你开头的工作。
[DebuggerDisplay("Count = {Count}")]
[Serializable]
public class ConcurrentHashSet<T> : ICollection<T>, ISet<T>, ISerializable, IDeserializationCallback
{
    private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);

    private readonly HashSet<T> _hashSet = new HashSet<T>();

    public ConcurrentHashSet()
    {
    }

    public ConcurrentHashSet(IEqualityComparer<T> comparer)
    {
        _hashSet = new HashSet<T>(comparer);
    }

    public ConcurrentHashSet(IEnumerable<T> collection)
    {
        _hashSet = new HashSet<T>(collection);
    }

    public ConcurrentHashSet(IEnumerable<T> collection, IEqualityComparer<T> comparer)
    {
        _hashSet = new HashSet<T>(collection, comparer);
    }

    protected ConcurrentHashSet(SerializationInfo info, StreamingContext context)
    {
        _hashSet = new HashSet<T>();

        // not sure about this one really...
        var iSerializable = _hashSet as ISerializable;
        iSerializable.GetObjectData(info, context);
    }

    #region Dispose

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (disposing)
            if (_lock != null)
                _lock.Dispose();
    }

    public IEnumerator<T> GetEnumerator()
    {
        return _hashSet.GetEnumerator();
    }

    ~ConcurrentHashSet()
    {
        Dispose(false);
    }

    public void OnDeserialization(object sender)
    {
        _hashSet.OnDeserialization(sender);
    }

    public void GetObjectData(SerializationInfo info, StreamingContext context)
    {
        _hashSet.GetObjectData(info, context);
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    #endregion

    public void Add(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.Add(item);
        }
        finally
        {
            if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public void UnionWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        _lock.EnterReadLock();
        try
        {
            _hashSet.UnionWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            if (_lock.IsReadLockHeld) _lock.ExitReadLock();
        }
    }

    public void IntersectWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        _lock.EnterReadLock();
        try
        {
            _hashSet.IntersectWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            if (_lock.IsReadLockHeld) _lock.ExitReadLock();
        }
    }

    public void ExceptWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        _lock.EnterReadLock();
        try
        {
            _hashSet.ExceptWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            if (_lock.IsReadLockHeld) _lock.ExitReadLock();
        }
    }

    public void SymmetricExceptWith(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.SymmetricExceptWith(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool IsSubsetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsSubsetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool IsSupersetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsSupersetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool IsProperSupersetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsProperSupersetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool IsProperSubsetOf(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.IsProperSubsetOf(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool Overlaps(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Overlaps(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool SetEquals(IEnumerable<T> other)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.SetEquals(other);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    bool ISet<T>.Add(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Add(item);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public void Clear()
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.Clear();
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool Contains(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Contains(item);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public void CopyTo(T[] array, int arrayIndex)
    {
        _lock.EnterWriteLock();
        try
        {
            _hashSet.CopyTo(array, arrayIndex);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public bool Remove(T item)
    {
        _lock.EnterWriteLock();
        try
        {
            return _hashSet.Remove(item);
        }
        finally
        {
            if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
        }
    }

    public int Count
    {
        get
        {
            _lock.EnterWriteLock();
            try
            {
                return _hashSet.Count;
            }
            finally
            {
                if(_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }

        }
    }

    public bool IsReadOnly
    {
        get { return false; }
    }
}

锁被处理了...但是内部的哈希集合呢?它的内存什么时候被释放? - David Rettenbacher
1
@Warappa 它是在垃圾回收时释放的。我手动将事物置空并清除它们在类中的全部存在的唯一时间是当主题包含事件并且可能会泄漏内存时(例如,当您使用ObservableCollection及其更改事件时)。如果您可以为我对此主题的理解增加知识,我很乐意听取建议。我已经花了几天时间研究垃圾回收,并且总是对新信息感到好奇。 - Dbl
@AndreasMüller 很好的回答,但我想知道为什么您在一些方法中(如“IntersectWith”)使用 '_lock.EnterWriteLock();' 后跟 '_lock.EnterReadLock();'。我认为这里没有必要使用读锁,因为写锁默认情况下会防止任何读取。 - Jalal Said
如果您总是必须使用EnterWriteLock,那么为什么还需要EnterReadLock呢?它不能用于像Contains这样的方法吗? - ErikE
2
这不是ConcurrentHashSet,而是ThreadSafeHashSet。请参见我的评论@ZenLulz答案中关于自我实现的部分。我99%确定使用这些实现的任何人都会在他们的应用程序中遇到严重的错误。 - George Mavritsakis

1
我发现仅仅锁定System.Collections.Generic.HashSet的添加和删除方法,或者包装框架的ConcurrentDictionary在需要良好性能的“高吞吐量”场景中是不够的。通过使用这个简单的想法,已经可以实现相当不错的性能:
public class ExampleHashSet<T>
{
    const int ConcurrencyLevel = 124;
    const int Lower31BitMask = 0x7FFFFFFF;
    
    HashSet<T>[] sets = new HashSet<T>[ConcurrencyLevel];
    IEqualityComparer<T> comparer;

    public ExampleHashSet()
    {
        comparer = EqualityComparer<T>.Default;

        for(int i = 0; i < ConcurrencyLevel; i++)
            sets[i] = new HashSet<T>();
    }

    public bool Add(T item)
    {
        int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
        
        lock(sets[hash]) 
        {
            return sets[hash].Add(item);
        }
    }

    public bool Remove(T item)
    {
        int hash = (comparer.GetHashCode(item) & Lower31BitMask) % ConcurrencyLevel;
        
        lock(sets[hash]) 
        {
            return sets[hash].Remove(item);
        }
    }

    // further methods ...
}

系统的HashSet被包装了,但与其他答案相比,我们在多个HashSet上持有锁。不同的线程可以在不同的HashSet上“工作”,从而降低总等待时间。
这个想法可以概括并直接实现在HashSet本身中(在桶上持有锁,而不是锁定完整的集合)。一个例子可以在这里找到。

这是一个好主意,前提是你不需要支持枚举。如果你需要枚举,那么它会变得非常低效和混乱。 - Theodor Zoulias

0
对于大多数常见任务,下面的ConcurrentDictionary派生类应该足够了。要获得一个完全成熟的哈希集合,您仍然可以使用ConcurrentDictionary作为后备字段,实现所有HashSet接口,并将调用传递给字典。
简单实现
public class ConcurrentHashSet<T> : ConcurrentDictionary<T, byte>
    where T : notnull
{
    const byte DummyByte = byte.MinValue;

    // For convenience, we add HashSet equivalent APIs here...
    public bool Contains(T item) => ContainsKey(item);
    public bool Add(T item) => TryAdd(item, DummyByte);
    public bool Remove(T item) => TryRemove(item, out _);
}

限制

  1. 不支持 null 值。
  2. 将表现为字典。例如,枚举、序列化...
  3. 缺少 HashSet 特定接口。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接