何时使用BlockingCollection和ConcurrentBag而不是List<T>?

53
4个回答

86

您确实可以使用BlockingCollection,但这样做没有任何意义。

首先要注意的是,BlockingCollection是一个封装了实现了IProducerConsumerCollection<T>的集合的包装器。实现了该接口的任何类型都可以用作底层存储:

创建BlockingCollection<T>对象时,不仅可以指定有界容量,还可以指定要使用的集合类型。例如,您可以为先进先出(FIFO)行为指定ConcurrentQueue<T>对象,或者为后进先出(LIFO)行为指定ConcurrentStack<T>对象。您可以使用实现IProducerConsumerCollection<T>接口的任何集合类。默认的BlockingCollection<T>集合类型为ConcurrentQueue<T>

这包括ConcurrentBag<T>,这意味着您可以拥有一个阻塞的并发包。那么普通的IProducerConsumerCollection<T>和阻塞集合之间有什么区别?BlockingCollection的文档中写道(重点是我的):

BlockingCollection<T>用作IProducerConsumerCollection<T>实例的包装器,允许从集合中尝试移除元素直到有数据可供移除时阻塞。同样,可以创建一个BlockingCollection<T>,以强制执行允许的数据元素数量上限

IProducerConsumerCollection<T>是一个接口,可用于实现生产者-消费者模型的集合。在链接的问题中既不需要这些功能,因此使用BlockingCollection只是增加了一个未使用的功能层。

3
@Jon,谢谢你,这对我很有帮助,让我从愚蠢的状态中摆脱出来,并停止浪费时间学习ConcurrentBag和BlockingCollection,而我实际上需要的是ConcurrentDictionary。 - Fulproof

34
  • List<T> 是一个设计用于单线程应用程序的集合。

  • ConcurrentBag<T>Collections.Concurrent 命名空间中的一个类,旨在简化在多线程环境中使用集合。如果您使用 ConcurrentCollection<T>,您将不必锁定集合以防止其他线程的损坏。您可以在不需要编写特殊锁定代码的情况下向集合插入或获取数据。

  • BlockingCollection<T> 的设计目的是消除在线程之间共享的集合中检查是否有新数据可用的要求。如果有新数据插入到共享集合中,则您的消费者线程将立即唤醒。因此,您不必在一段时间间隔内定期检查消费者线程是否有新数据可用,通常是在 while 循环中。


1
我没有看到 ConcurrentCollection<T> 的类。 从反编译器来看:public class ConcurrentBag<T> :IProducerConsumerCollection<T>,IEnumerable<T>,IEnumerable,ICollection,IReadOnlyCollection<T> - C. Tewalt
1
我仍然给了+1——这有助于消费者线程在BlockingCollection<T>上唤醒的澄清。谢谢! - C. Tewalt
嗨@Ahmet Arslan,感谢您分享的伟大知识,我也想使用Blocking collection,因为我不想在一段时间内检查新数据是否可供消费者线程使用,通常是在while循环中,但我不知道如何实现它。您能否请帮我提供一些代码片段? - Dreamer

20
每当你需要一个线程安全的List时,在大多数情况下,ConcurrentBag和BlockingCollection都不是你最好的选择。这两个集合都专门用于促进生产者-消费者场景,所以除非你有多个线程同时向集合中添加和删除项目,否则你应该寻找其他选项(在大多数情况下,最佳选择是ConcurrentQueue)。
关于特别提到的ConcurrentBag<T>,它是一个针对混合生产者-消费者场景的极其专门化的类。这意味着每个工作线程都被期望成为生产者和消费者(从同一个集合中添加移除项目)。它可能ObjectPool类的内部存储的一个很好的选择,但除此之外,很难想象出任何有利的使用场景。
人们通常认为ConcurrentBag<T>List<T>的线程安全等价物,但事实并非如此。这两个API的相似性是误导性的。向List<T>调用Add会在列表末尾添加一个项目。而向ConcurrentBag<T>调用Add会将项目添加到袋子内的随机位置。ConcurrentBag<T>本质上是无序的。它在被枚举时没有进行优化,当被命令执行时表现糟糕。它在内部维护了一组线程本地队列,因此其内容的顺序由哪个线程执行了什么操作来决定,而不是发生了什么时候。在枚举ConcurrentBag<T>之前,所有这些线程本地队列都会被复制到一个数组中,给垃圾收集器增加了压力(源代码)。因此,例如var item = bag.First();这一行会导致整个集合被复制,只返回一个元素。
这些特性使得ConcurrentBag<T>不是存储Parallel.For/Parallel.ForEach循环结果的理想选择。 List<T>.Add的更好的线程安全替代方法是ConcurrentQueue<T>.Enqueue方法。"Enqueue"这个词比"Add"不太常见,但它实际上做你期望它做的事情。 ConcurrentBag<T>无法做到ConcurrentQueue<T>不能做的事情。例如,这两个集合都没有提供一种从集合中移除特定项的方法。如果您想要一个具有TryRemove方法和key参数的并发集合,您可以查看ConcurrentDictionary<K,V>类。 ConcurrentBag<T>在微软文档中与任务并行库相关的示例中经常出现。比如这里。文档的撰写者显然更加重视使用Add而不是Enqueue的微小可用性优势,而忽视了使用错误集合所带来的行为/性能劣势。考虑到这些示例是在TPL刚出现时编写的,目标是让大多数不熟悉并行编程的开发人员快速采用该库,这种做法有一定道理。我明白,第一次看到Enqueue这个词可能会让人感到害怕。不幸的是,现在有一整代开发人员已经将ConcurrentBag<T>纳入他们的心智工具中,尽管考虑到这个集合的特殊性,它并不适合在那里存在。
如果你想按照源元素的顺序收集Parallel.ForEach循环的结果,你可以使用一个用lock保护的List<T>。在大多数情况下,开销将是可以忽略的,特别是如果循环内的工作量较大。下面是一个示例:
List<TResult> results = new();

Parallel.ForEach(source, parallelOptions, (item, state, index) =>
{
    TResult result = GetResult(item);
    lock (results)
    {
        while (results.Count <= index) results.Add(default);
        results[(int)index] = result;
    }
});

这是针对源代码是未知大小的延迟序列的情况。如果您事先知道其大小,那就更简单了。只需预先分配一个TResult[]数组,并在并行环境中进行更新而无需锁定。
TResult[] results = new TResult[source.Count];

Parallel.ForEach(source, parallelOptions, (item, state, index) =>
{
    results[index] = GetResult(item);
});

TPL在任务执行结束时包含内存屏障,因此当前线程可以看到results数组的所有值(citation)。

3

是的,你可以使用BlockingCollection来实现。定义finishedProxies如下:

BlockingCollection<string> finishedProxies = new BlockingCollection<string>();

要添加一个项目,您需要写入:

finishedProxies.Add(checkResult);

完成后,您可以从内容创建列表。


2
Jim Mischel,我正在阅读你的.NET参考指南。但愿我早点能找到它,这样就更符合我的实际需求了。 - Fulproof

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接