如何在.Net中实现ConcurrentHashSet

38

我正在尝试实现一个类似于ConcurrentDictionary的ConcurrentHashSet, 我的方法是使用一个内部支持的ConcurrentDictionary,并编写一些委托方法。 但是,我目前遇到问题的是集合理论方面的方法,特别是我不确定是否可以使用foreach而不违反并发性。

public class ConcurrentHashSet<TElement> : ISet<TElement>
{
    private readonly ConcurrentDictionary<TElement, object> _internal;

    public ConcurrentHashSet(IEnumerable<TElement> elements = null)
    {
        _internal = new ConcurrentDictionary<TElement, object>();
        if (elements != null)
            UnionWith(elements);
    }

    public void UnionWith(IEnumerable<TElement> other)
    {
        if (other == null) throw new ArgumentNullException("other");

        foreach (var otherElement in other)
            Add(otherElement);
    }

    public void IntersectWith(IEnumerable<TElement> other)
    {
        throw new NotImplementedException();
    }

    public void ExceptWith(IEnumerable<TElement> other)
    {
        throw new NotImplementedException();
    }

    public void SymmetricExceptWith(IEnumerable<TElement> other)
    {
        throw new NotImplementedException();
    }

    public bool IsSubsetOf(IEnumerable<TElement> other)
    {
        throw new NotImplementedException();
    }

    public bool IsSupersetOf(IEnumerable<TElement> other)
    {
        throw new NotImplementedException();
    }

    public bool IsProperSupersetOf(IEnumerable<TElement> other)
    {
        throw new NotImplementedException();
    }

    public bool IsProperSubsetOf(IEnumerable<TElement> other)
    {
        throw new NotImplementedException();
    }

    public bool Overlaps(IEnumerable<TElement> other)
    {
        return other.Any(otherElement => _internal.ContainsKey(otherElement));
    }

    public bool SetEquals(IEnumerable<TElement> other)
    {
        int otherCount = 0;
        int thisCount = Count;
        foreach (var otherElement in other)
        {
            otherCount++;
            if (!_internal.ContainsKey(otherElement))
                return false;
        }
        return otherCount == thisCount;
    }

    public bool Add(TElement item)
    {
        return _internal.TryAdd(item, null);
    }

    public void Clear()
    {
        _internal.Clear();
    }

    // I am not sure here if that fullfills contract correctly
    void ICollection<TElement>.Add(TElement item)
    {
        Add(item);
    }

    public bool Contains(TElement item)
    {
        return _internal.ContainsKey(item);
    }

    public void CopyTo(TElement[] array, int arrayIndex)
    {
        _internal.Keys.CopyTo(array, arrayIndex);
    }

    public bool Remove(TElement item)
    {
        object ignore;
        return _internal.TryRemove(item, out ignore);
    }

    public int Count
    {
        get { return _internal.Count; }
    }

    public bool IsReadOnly
    {
        get { return false; }
    }

    public IEnumerator<TElement> GetEnumerator()
    {
        return _internal.Keys.GetEnumerator();
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }
}

1
试图购买轻微的并发性改进并放弃 HashSet 的性能是没有意义的。 - Hans Passant
@Hans,如果我使用Add、Remove、Contains方法,性能也会受到影响吗?此外,我认为我可以在另一个线程添加元素的同时遍历Set而不会有任何问题。 - Sebastian
1
如果您不介意它没有实现ISet<T>(并且您不喜欢基于ConcurrentDictionary的解决方案),您可以尝试使用这个ThreadSafeHashSet<T>实现(免责声明:由我编写)。它具有与ConcurrentDictionary<TKey, TValue>略有不同的特性(例如,访问Count不是那么可怕的缓慢)。请参阅我的这个类似问题的答案,以获取更多细节,我不想在这里复制粘贴。 - György Kőszeg
4个回答

33

我刚遇到了类似的情况(“我对快速添加、包含和删除感兴趣”),并实现了以下代码:

using System.Collections.Generic;
using System.Threading;

namespace BlahBlah.Utilities
{
    public class ConcurrentHashSet<T> : IDisposable
    {
        private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
        private readonly HashSet<T> _hashSet = new HashSet<T>();

        #region Implementation of ICollection<T> ...ish
        public bool Add(T item)
        {
            try
            {
                _lock.EnterWriteLock();
                return _hashSet.Add(item);
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }

        public void Clear()
        {
            try
            {
                _lock.EnterWriteLock();
                _hashSet.Clear();
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }

        public bool Contains(T item)
        {
            try
            {
                _lock.EnterReadLock();
                return _hashSet.Contains(item);
            }
            finally
            {
                if (_lock.IsReadLockHeld) _lock.ExitReadLock();
            }
        }

        public bool Remove(T item)
        {
            try
            {
                _lock.EnterWriteLock();
                return _hashSet.Remove(item);
            }
            finally
            {
                if (_lock.IsWriteLockHeld) _lock.ExitWriteLock();
            }
        }

        public int Count
        {
            get
            {
                try
                {
                    _lock.EnterReadLock();
                    return _hashSet.Count;
                }
                finally
                {
                    if (_lock.IsReadLockHeld) _lock.ExitReadLock();
                }
            }
        }
        #endregion

        #region Dispose
        public void Dispose()
        {
            if (_lock != null) _lock.Dispose();
        }
        #endregion
    }
}

我并没有进行过真正的测试(性能或可靠性方面)。结果因人而异。


2
请注意,如果您主要进行“包含”调用,则性能将大大提高。 “添加”/“清除”/“删除”都需要独占锁定,因此如果您向它们发送太多请求,将会失去很多并发性。 - Ben Mosher
1
@AlirezaNoori,仅使用简单的public void Dispose() { _lock.Dispose(); }是否足以进行处理? - Oleks
3
@AlirezaNoori,由于ReaderWriterLockSlim是托管的,因此在这里不需要使用终结器。参考链接 - Oleks
2
是的,我同意你的观点。我通常会这样编写dispose方法,因为它们对于未来的实现更加方便。确实,在这里不需要(这就是你所问的,而不是我告诉你从你发布的链接中已经知道的内容)。我以为你在问这三个函数一般情况下是否都不需要。虽然在这里不需要,但如果有人在这里添加了一个非托管资源呢?这就是为什么我总是以这种方式编写我的dispose方法的原因。 - Alireza Noori
2
@AlirezaNoori,你只是为了复制“Dispose模式”反模式而让最终化变得更加耗时。至少在这种情况下,它并没有超出错误范围,但是一个不进行最终化的终结器对类所做的唯一贡献就是增加了一个新的错误发生地点。 - Jon Hanna
显示剩余9条评论

15
以下是对基于并发字典实现的并发集合的翻译:

这里是一个基于 ConcurrentDictionary 实现的并发集合:

public class ConcurrentSet<T> : IEnumerable<T>, ISet<T>, ICollection<T>
{
    private readonly ConcurrentDictionary<T, byte> _dictionary = new ConcurrentDictionary<T, byte>();

    /// <summary>
    /// Returns an enumerator that iterates through the collection.
    /// </summary>
    /// <returns>
    /// A <see cref="T:System.Collections.Generic.IEnumerator`1"/> that can be used to iterate through the collection.
    /// </returns>
    public IEnumerator<T> GetEnumerator()
    {
        return _dictionary.Keys.GetEnumerator();
    }

    /// <summary>
    /// Returns an enumerator that iterates through a collection.
    /// </summary>
    /// <returns>
    /// An <see cref="T:System.Collections.IEnumerator"/> object that can be used to iterate through the collection.
    /// </returns>
    IEnumerator IEnumerable.GetEnumerator()
    {
        return GetEnumerator();
    }

    /// <summary>
    /// Removes the first occurrence of a specific object from the <see cref="T:System.Collections.Generic.ICollection`1"/>.
    /// </summary>
    /// <returns>
    /// true if <paramref name="item"/> was successfully removed from the <see cref="T:System.Collections.Generic.ICollection`1"/>; otherwise, false. This method also returns false if <paramref name="item"/> is not found in the original <see cref="T:System.Collections.Generic.ICollection`1"/>.
    /// </returns>
    /// <param name="item">The object to remove from the <see cref="T:System.Collections.Generic.ICollection`1"/>.</param><exception cref="T:System.NotSupportedException">The <see cref="T:System.Collections.Generic.ICollection`1"/> is read-only.</exception>
    public bool Remove(T item)
    {
        return TryRemove(item);
    }

    /// <summary>
    /// Gets the number of elements in the set.
    /// </summary>
    public int Count
    {
        get { return _dictionary.Count; }
    }

    /// <summary>
    /// Gets a value indicating whether the <see cref="T:System.Collections.Generic.ICollection`1"/> is read-only.
    /// </summary>
    /// <returns>
    /// true if the <see cref="T:System.Collections.Generic.ICollection`1"/> is read-only; otherwise, false.
    /// </returns>
    public bool IsReadOnly { get { return false; } }

    /// <summary>
    /// Gets a value that indicates if the set is empty.
    /// </summary>
    public bool IsEmpty
    {
        get { return _dictionary.IsEmpty; }
    }

    public ICollection<T> Values
    {
        get { return _dictionary.Keys; }
    }

    /// <summary>
    /// Adds an item to the <see cref="T:System.Collections.Generic.ICollection`1"/>.
    /// </summary>
    /// <param name="item">The object to add to the <see cref="T:System.Collections.Generic.ICollection`1"/>.</param><exception cref="T:System.NotSupportedException">The <see cref="T:System.Collections.Generic.ICollection`1"/> is read-only.</exception>
    void ICollection<T>.Add(T item)
    {
        if(!Add(item))
            throw new ArgumentException("Item already exists in set.");
    }

    /// <summary>
    /// Modifies the current set so that it contains all elements that are present in both the current set and in the specified collection.
    /// </summary>
    /// <param name="other">The collection to compare to the current set.</param><exception cref="T:System.ArgumentNullException"><paramref name="other"/> is null.</exception>
    public void UnionWith(IEnumerable<T> other)
    {
        foreach (var item in other)
            TryAdd(item);
    }

    /// <summary>
    /// Modifies the current set so that it contains only elements that are also in a specified collection.
    /// </summary>
    /// <param name="other">The collection to compare to the current set.</param><exception cref="T:System.ArgumentNullException"><paramref name="other"/> is null.</exception>
    public void IntersectWith(IEnumerable<T> other)
    {
        var enumerable = other as IList<T> ?? other.ToArray();
        foreach (var item in this)
        {
            if (!enumerable.Contains(item))
                TryRemove(item);
        }
    }

    /// <summary>
    /// Removes all elements in the specified collection from the current set.
    /// </summary>
    /// <param name="other">The collection of items to remove from the set.</param><exception cref="T:System.ArgumentNullException"><paramref name="other"/> is null.</exception>
    public void ExceptWith(IEnumerable<T> other)
    {
        foreach (var item in other)
            TryRemove(item);
    }

    /// <summary>
    /// Modifies the current set so that it contains only elements that are present either in the current set or in the specified collection, but not both. 
    /// </summary>
    /// <param name="other">The collection to compare to the current set.</param><exception cref="T:System.ArgumentNullException"><paramref name="other"/> is null.</exception>
    public void SymmetricExceptWith(IEnumerable<T> other)
    {
        throw new NotImplementedException();
    }

    /// <summary>
    /// Determines whether a set is a subset of a specified collection.
    /// </summary>
    /// <returns>
    /// true if the current set is a subset of <paramref name="other"/>; otherwise, false.
    /// </returns>
    /// <param name="other">The collection to compare to the current set.</param><exception cref="T:System.ArgumentNullException"><paramref name="other"/> is null.</exception>
    public bool IsSubsetOf(IEnumerable<T> other)
    {
        var enumerable = other as IList<T> ?? other.ToArray();
        return this.AsParallel().All(enumerable.Contains);
    }

    /// <summary>
    /// Determines whether the current set is a superset of a specified collection.
    /// </summary>
    /// <returns>
    /// true if the current set is a superset of <paramref name="other"/>; otherwise, false.
    /// </returns>
    /// <param name="other">The collection to compare to the current set.</param><exception cref="T:System.ArgumentNullException"><paramref name="other"/> is null.</exception>
    public bool IsSupersetOf(IEnumerable<T> other)
    {
        return other.AsParallel().All(Contains);
    }

    /// <summary>
    /// Determines whether the current set is a correct superset of a specified collection.
    /// </summary>
    /// <returns>
    /// true if the <see cref="T:System.Collections.Generic.ISet`1"/> object is a correct superset of <paramref name="other"/>; otherwise, false.
    /// </returns>
    /// <param name="other">The collection to compare to the current set. </param><exception cref="T:System.ArgumentNullException"><paramref name="other"/> is null.</exception>
    public bool IsProperSupersetOf(IEnumerable<T> other)
    {
        var enumerable = other as IList<T> ?? other.ToArray();
        return this.Count != enumerable.Count && IsSupersetOf(enumerable);
    }

    /// <summary>
    /// Determines whether the current set is a property (strict) subset of a specified collection.
    /// </summary>
    /// <returns>
    /// true if the current set is a correct subset of <paramref name="other"/>; otherwise, false.
    /// </returns>
    /// <param name="other">The collection to compare to the current set.</param><exception cref="T:System.ArgumentNullException"><paramref name="other"/> is null.</exception>
    public bool IsProperSubsetOf(IEnumerable<T> other)
    {
        var enumerable = other as IList<T> ?? other.ToArray();
        return Count != enumerable.Count && IsSubsetOf(enumerable);
    }

    /// <summary>
    /// Determines whether the current set overlaps with the specified collection.
    /// </summary>
    /// <returns>
    /// true if the current set and <paramref name="other"/> share at least one common element; otherwise, false.
    /// </returns>
    /// <param name="other">The collection to compare to the current set.</param><exception cref="T:System.ArgumentNullException"><paramref name="other"/> is null.</exception>
    public bool Overlaps(IEnumerable<T> other)
    {
        return other.AsParallel().Any(Contains);
    }

    /// <summary>
    /// Determines whether the current set and the specified collection contain the same elements.
    /// </summary>
    /// <returns>
    /// true if the current set is equal to <paramref name="other"/>; otherwise, false.
    /// </returns>
    /// <param name="other">The collection to compare to the current set.</param><exception cref="T:System.ArgumentNullException"><paramref name="other"/> is null.</exception>
    public bool SetEquals(IEnumerable<T> other)
    {
        var enumerable = other as IList<T> ?? other.ToArray();
        return Count == enumerable.Count && enumerable.AsParallel().All(Contains);
    }

    /// <summary>
    /// Adds an element to the current set and returns a value to indicate if the element was successfully added. 
    /// </summary>
    /// <returns>
    /// true if the element is added to the set; false if the element is already in the set.
    /// </returns>
    /// <param name="item">The element to add to the set.</param>
    public bool Add(T item)
    {
        return TryAdd(item);
    }

    public void Clear()
    {
        _dictionary.Clear();
    }

    public bool Contains(T item)
    {
        return _dictionary.ContainsKey(item);
    }

    /// <summary>
    /// Copies the elements of the <see cref="T:System.Collections.Generic.ICollection`1"/> to an <see cref="T:System.Array"/>, starting at a particular <see cref="T:System.Array"/> index.
    /// </summary>
    /// <param name="array">The one-dimensional <see cref="T:System.Array"/> that is the destination of the elements copied from <see cref="T:System.Collections.Generic.ICollection`1"/>. The <see cref="T:System.Array"/> must have zero-based indexing.</param><param name="arrayIndex">The zero-based index in <paramref name="array"/> at which copying begins.</param><exception cref="T:System.ArgumentNullException"><paramref name="array"/> is null.</exception><exception cref="T:System.ArgumentOutOfRangeException"><paramref name="arrayIndex"/> is less than 0.</exception><exception cref="T:System.ArgumentException"><paramref name="array"/> is multidimensional.-or-The number of elements in the source <see cref="T:System.Collections.Generic.ICollection`1"/> is greater than the available space from <paramref name="arrayIndex"/> to the end of the destination <paramref name="array"/>.-or-Type <paramref name="T"/> cannot be cast automatically to the type of the destination <paramref name="array"/>.</exception>
    public void CopyTo(T[] array, int arrayIndex)
    {
        Values.CopyTo(array, arrayIndex);
    }

    public T[] ToArray()
    {
        return _dictionary.Keys.ToArray();
    }

    public bool TryAdd(T item)
    {
        return _dictionary.TryAdd(item, default(byte));
    }

    public bool TryRemove(T item)
    {
        byte donotcare;
        return _dictionary.TryRemove(item, out donotcare);
    }
}

2
在CopyTo方法中,你不应该使用Keys.CopyTo吗? - Itsik
1
非常聪明。点赞。现在该怎么处理这些额外的字节呢 ;) - C. Tewalt
1
集合操作(ExceptWith,UnionWith等)不是线程安全的;另一个线程可能会同时修改集合的内容,因此在集合操作结束时,结果可能不是预期的。 - Thomas Levesque
2
@DavidPfeffer,实际上是有的;但你需要使用不可变的内部数据结构,就像BCL不可变集合一样。但我同意,对于像ConcurrentDictionary这样的可变数据结构来说,这是不可行的;我只是指出了这个事实。 - Thomas Levesque
2
是的。对象具有与其关联的额外信息,用于锁定、在堆栈中存储值以及其他事项。请参考此处的答案:https://dev59.com/xnNA5IYBdhLWcg3wC5NR - TamusJRoyce
显示剩余2条评论

6

ConcurrentDictionary在读取时使用无锁方式,因此在.NET 4.0+中具有更好的性能特征。因此,在重度多线程场景下,ConcurrentDictionary作为readerwriterlockslim包装器可能会表现得更好。但是,您需要携带一个空字节作为虚拟值(我同意,这看起来很糟糕)。


2
我已经实现了一个ConcurrentSet<T>,在底层确切地执行了这个操作。http://pastebin.com/8REHRFFL - Brent
1
HashSet具有O(1)的检索能力。 - kerem

4

你打算用它来做什么?

var set1 = new ConcurrentHashSet<int>();
...

if (set1.Overlaps(set2))
{
    set1.IntersectWith(set2);
    assert(! set1.IsEmpty());    // might fail
}

这可能可以接受,但与队列相比,在并发环境中使用 Set 的可能性要小得多。


实际上,我并不一定需要这些函数,我只对快速的Add、Contains和Remove(任意)感兴趣,而队列好像没有这些函数。我只是不想提交一个带有NotImplementedException的类。 - Sebastian
1
即使这是一个很旧的帖子,我仍然需要相同的东西并想分享解决此问题的想法。一个好的解决方案是添加一个TryAddM方法,类似于ConcurrentDictionary。这样,您就可以解决数据竞争问题。 - Boas Enkler

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接