ConcurrentBag - 如何添加多个元素?

57

有没有一种方法可以一次性添加多个元素到ConcurrentBag中,而不是一个一个地添加?我没有在ConcurrentBag上看到AddRange()方法,但是有一个Concat()方法。然而,这对我来说不起作用:

ConcurrentBag<T> objectList = new ConcurrentBag<T>();

timeChunks.ForEach(timeChunk =>
{
    List<T> newList = Foo.SomeMethod<T>(x => x.SomeReadTime > timeChunk.StartTime);
    objectList.Concat<T>(newList);
});

这段代码曾经是在Parallel.ForEach()中运行的,但我将其更改为上面这种形式以便进行故障排查。变量newList确实有对象,但在objectList.Concat<>之后,objectList始终没有任何对象。Concat<>不是那样工作的吗?我是否需要使用Add()方法一个一个地向ConcurrentBag中添加项目?

7个回答

82

我知道这是一个老帖子,但我想添加一些内容。

像其他人所说的那样:是的,你需要逐个添加它们。在我的情况下,我添加了一个小的扩展方法来使事情变得更加清晰,但在底层它做的事情是相同的:

public static void AddRange<T>(this ConcurrentBag<T> @this, IEnumerable<T> toAdd)
{
    foreach (var element in toAdd)
    {
        @this.Add(element);
    }
}

然后:
ConcurrentBag<int> ccBag = new ConcurrentBag<int>();
var listOfThings = new List<int>() { 1, 2, 4, 5, 6, 7, 8, 9 };
ccBag.AddRange(listOfThings);

我也尝试使用AsParallel在扩展方法中添加,但是在对各种大小的字符串列表进行加法运算的测试中,使用AsParallel(如此处所示)相比传统的for循环更慢。

public static void AddRange<T>(this ConcurrentBag<T> @this, IEnumerable<T> toAdd)
{
    toAdd.AsParallel().ForAll(t => @this.Add(t));
}

这应该是问题的答案。 - Sailing Judo
这是正确的答案。同时尝试使用Parallel(AsParallel().ForAll())运行测试,但速度较慢。 - Cubelaster
如果有人和我一样对以下问题感兴趣:“将项目并行添加是否有意义,因为接收器是单个集合?” 可以在以下几个答案中找到关于ConcurrentBag实现的有用信息: https://dev59.com/T3I-5IYBdhLWcg3wwbZL#5204488 https://dev59.com/T3I-5IYBdhLWcg3wwbZL#1688904 - user1234567

23

Concat 是由 LINQ 提供的扩展方法。它是一种不可变操作,返回另一个可枚举源集合后紧接着指定集合的 IEnumerable。它不会以任何方式改变源集合。

您需要逐个将项目添加到 ConcurrentBag 中。


12

我遇到了类似的问题,想要并行处理小块数据,因为一个大块数据在发送端访问我的数据的web服务时超时,但我不希望通过串行处理每个块来降低速度。逐条处理数据甚至更慢-由于我调用的服务可以处理批量请求,所以最好提交尽可能多的请求而不会超时。

像Vlad说的那样,将ConcurrentBag拼接到对象类型列表中不会返回ConcurrentBag,因此拼接不起作用!(花了我一些时间才意识到我不能这样做)

请尝试使用以下方法-创建一个List<T>,然后创建一个ConcurrentBag<List<T>>。在每个并行迭代中,它将向ConcurrentBag添加一个新列表。当并行循环完成后,遍历ConcurrentBag并连接(或者如果您想要消除可能的重复项,则进行联合)到您创建的第一个List<T>,以“展平”所有内容到一个列表中。


4
最终请使用 SelectMany 来展开。 - Diogo Luis
在我看来,这应该是被采纳的答案。它描述了原帖作者的最终需求,为什么 concat 不起作用以及一个可靠的解决方案。 - CobaltBlue

2
Concat方法是包含在公共的System.Linq库(内部.NET System.Core程序集)中支持的Enumerable静态类的一种方法。

但是,Concat方法存在限制,无法提供“添加范围”要求到ConcurrentBag<T>对象,其行为如下图所示(在Visual Studio的第47行):

enter image description here

为了使Concat方法匹配“添加范围”要求,需要更新当前ConcurrentBag<T>对象实例;如果程序需要添加多个范围,则需要从当前ConcurrentBag<T>(递归地)自动引用实例来为每个范围添加。

enter image description here

因此,我不使用Concat方法,如果我可以提供建议,我不建议使用。我遵循Eric的答案中的类似示例,从ConcurrentBag<T>派生出一个类,该类使用ConcurrentBag<T>基本方法向派生实例添加IEnumerable<T>项,如下所示:

public class ConcurrentBagCompleted<T> : ConcurrentBag<T>
{
    public ConcurrentBagCompleted() : base() { }

    public ConcurrentBagCompleted(IEnumerable<T> collection):base(collection)
    {
    }

    public void AddRange(IEnumerable<T> collection)
    {
        Parallel.ForEach(collection, item =>
        {
            base.Add(item);
        });
    }
}

0

ConcurrentBag<T>有一个第二个构造函数,它以IEnumerable作为参数并从中构建包。这是我发现的最快、最安全的方法。 请参考ConcurrentBag<T>(IEnumerable<T>) 因此,在问题的情况下,解决方案将是:

            ConcurrentBag<T> objectList = new ConcurrentBag<T>();

        timeChunks.ForEach(timeChunk =>
        {
            List<T> newList = Foo.SomeMethod<T>(x => x.SomeReadTime > timeChunk.StartTime);
            newList.AddRange(objectList.ToList());
            objectList=new ConcurrentBag<T>(newList);
        });

-1
ConcurrentBag<T> bag = new ConcurrentBag<T>();

Parallel.ForEach (items, item => {
    bag.Add(item);
});

1
谢谢回答。仅有代码的答案通常会被看作是不好的做法。您能否添加一些额外的细节来解释为什么这可以解决问题? - D. Foley

-6

是的 :)

Concat 可能是 Enumerable 扩展中的一个。它不会向 ConcurrentBag 添加任何内容,只是返回一些包含原始 bag 和您尝试添加的任何内容的奇怪对象。

请注意,Concat 的结果不再是 ConcurrentBag,因此您不想使用它。它是通用 LINQ 框架的一部分,使得组合不可变序列成为可能。当然,这个框架并不试图将操作数的并发属性扩展到结果,因此生成的对象不适合多线程访问。

(基本上,Concat 适用于 ConcurrentBag,因为它公开了 IEnumerable<T> 接口。)


2
这个“答案”显然没有回答问题。不确定为什么它被标记为答案。 - Sailing Judo
回答哪一个OP问题是"Yes"? - Lucas

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接