检查阻止集合中的重复项

4

在尝试添加新项之前,检查BlockingCollection中是否存在某个项的最佳方法是什么?基本上,我不希望将重复项添加到BlockingCollection中。

2个回答

6

你需要实现自己的 IProducerConsumerCollection<T>,它的行为类似于集合(例如,不允许重复项)。这里是一个使用临界区(C# lock)使其线程安全的简单版本。对于高并发情况,你可以像 ConcurrentQueue<T> 一样使用 SpinWait 类来提高性能。

public class ProducerConsumerSet<T> : IProducerConsumerCollection<T> {

  readonly object gate = new object();

  readonly Queue<T> queue = new Queue<T>();

  readonly HashSet<T> hashSet = new HashSet<T>();

  public void CopyTo(T[] array, int index) {
    if (array == null)
      throw new ArgumentNullException("array");
    if (index < 0)
      throw new ArgumentOutOfRangeException("index");
    lock (gate)
      queue.CopyTo(array, index);
  }

  public bool TryAdd(T item) {
    lock (gate) {
      if (hashSet.Contains(item))
        return false;
      queue.Enqueue(item);
      hashSet.Add(item);
      return true;
    }
  }

  public bool TryTake(out T item) {
    lock (gate) {
      if (queue.Count == 0) {
        item = default(T);
        return false;
      }
      item = queue.Dequeue();
      hashSet.Remove(item);
      return true;
    }
  }

  public T[] ToArray() {
    lock (gate)
      return queue.ToArray();
  }

  public void CopyTo(Array array, int index) {
    if (array == null)
      throw new ArgumentNullException("array");
    lock (gate)
      ((ICollection) queue).CopyTo(array, index);
  }

  public int Count {
    get { return queue.Count; }
  }

  public object SyncRoot {
    get { return gate; }
  }

  public bool IsSynchronized {
    get { return true; }
  }

  public IEnumerator<T> GetEnumerator() {
    List<T> list = null;
    lock (gate)
      list = queue.ToList();
    return list.GetEnumerator();
  }

  IEnumerator IEnumerable.GetEnumerator() {
    return GetEnumerator();
  }

}

如果需要的话,您可以详细说明此类以通过提供可选的 IEqualityComparer<T> 来定制相等性,然后用于初始化 HashSet<T>IProducerConsumerCollection<T>.Add 方法在尝试插入重复项时返回 false。这会导致 BlockingCollection<T>.Add 方法引发 InvalidOperationException,所以您可能需要将添加项目的代码包装到类似以下内容的代码中:
bool AddItem<T>(BlockingCollection<T> blockingCollection, T item) {
  try {
    blockingCollection.Add(item);
    return true;
  }
  catch (InvalidOperationException) {
    return false;
  }
}

请注意,如果您向已完成的集合中添加项,则还会收到一个 InvalidOperationException 异常,并且您需要检查异常消息以确定异常的根本原因。

1
我发现这种方法存在问题,如果生产者添加了重复的项,会导致阻塞集合无法达到上限并引发InvalidOperationException异常。 - Aashish Upadhyay
@AashishUpadhyay,问题是由于BlockingCollection<T>类中的一个错误引起的,这个问题将在.NET 8中得到修复。目前的解决方法是在TryAdd返回false时抛出异常。我已经发布了一个包含此解决方法的IProducerConsumerCollection<T>实现链接. - Theodor Zoulias

-3
使用TryAdd(data)方法。您还可以传递一个timespan对象或一个表示超时期限的int。返回truefalse。 请注意,如果基础集合类型无法处理重复项,并且您要添加的数据是重复项,则会引发InvalidOperationException异常。

2
所有底层(IProdCons)类都接受重复项,因此我认为仅靠这个不会有所帮助。 - H H

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接