ConcurrentBag<T>获取重复项(似乎不是线程安全的)

10

我一定在某个地方做错了什么,因为我的concurrentbag中出现了重复的项,以下是事件的发生过程:

  var listings = new ConcurrentBag<JSonListing>();
  Parallel.ForEach(Types, ParallelOptions, (type, state) =>
  {
      ...
      status = ProcessType(listings, status, type, state);
      ....
   });

  private GeocodeResults ProcessType(ConcurrentBag<JSonListing> listings, GeocodeResults status, XElement type, ParallelLoopState state)
  {
      ....
      AddListingsToList(results, type, listings);
      .... 
  }

private void AddListingsToList(dynamic results, XElement type, ConcurrentBag<JSonListing> listings)
    {

        var typeMustMatch = type.Attribute("TypeMustMatch").Value;
        var typeID = Convert.ToInt32(type.Attribute("ID").Value);

        foreach (var result in results.results)
        {


            var foundListing = listings.SingleOrDefault(x => x.ID == result.id);
            if (foundListing != null)
            {
                var typeIDs = foundListing.TypeIDs.Split("|".ToCharArray(), StringSplitOptions.RemoveEmptyEntries).ToList();
                if (!typeIDs.Contains(typeID.ToString()))
                {
                    foundListing.TypeIDs += "|" + typeID;
                }
            }
            else
            {
                var listing = new JSonListing { ID = result.id, ReferenceNumber = result.reference, TypeIDs = typeID.ToString(), TypeMustMatch = typeMustMatch };
                listings.Add(listing);
            }




        }


    }

添加列表应该保证,如果元素已经存在,则不会添加另一个具有相同ID的元素,而是更新某些属性。现在我得到的错误是:

System.InvalidOperationException:序列包含多个匹配的元素
在System.Linq.Enumerable.SingleOrDefault [TSource](IEnumerable`1 source,Func`2 predicate)中
在LocalSearch.Processor.CityProcessor.AddListingsToList(Object results,XElement type,ConcurrentBag`1 listings)中,位于d:\ Projects \ ListingLocator2 \ Code \ LocalSearch.Processor \ Processors.cs: line 310
在CallSite.Target(Closure,CallSite,CityProcessor,Object,XElement,ConcurrentBag`1)中
在LocalSearch.Processor.CityProcessor.ProcessType(ConcurrentBag`1 listings,GeocodeResults status,XElement type,ParallelLoopState state)中,位于d:\ Projects \ ListingLocator2 \ Code \ LocalSearch.Processor \ Processors.cs: line 249
在LocalSearch.Processor.CityProcessor。&lt;&gt; c__DisplayClass4.b__0(XElement type,ParallelLoopState state)中,位于d:\ Projects \ ListingLocator2 \ Code \ LocalSearch.Processor \ Processors.cs: line 137


这个问题可以使用 PLINQ 更加干净地解决。 - spender
@spender:看起来 ProcessType 会有一些重大的副作用,但这在示例中被省略了。 - Jon
1
ProcessType的主要作用是调用Google Places API,返回与搜索查询匹配的动态可用地点列表。由于这些不是内存中的项目,因此PLINQ不适用于此。 - Zoinky
2个回答

20
ConcurrentBag 保证其每个操作本身在多线程环境下是线程安全的,但它不保证连续执行的多个操作作为原子组进行处理。
因此,您的代码存在竞态条件:您检查包是否已经包含某个项目X,但两个线程可以同时运行测试,决定该项不存在并继续添加。结果:两份副本最终都被放入了包中。
看起来您的用例最好使用 ConcurrentDictionary 来实现,并利用 TryAdd 方法,该方法是原子性的。或者可以在包周围放置一个 lock() 来使块内的所有内容操作原子化,但那样就不需要使用并发集合了,而可以直接使用普通的 List

我如何确保没有重复项进入袋子? - Zoinky
我认为最好的方法是使用一个允许重复项的List<T>,完成后遍历该列表。大多数这样的列表大约有4k个项目,因此不会对性能造成很大的影响(至少我认为如此,但我会尝试一下)。 - Zoinky
@Zoinky:你可以尝试很多东西,最好的取决于你的确切要求(我们只能猜测)。与其允许重复项,为什么不使用适当的相等比较器和Set<T>呢?更好的是,既然这里有一个自然键可用,为什么不使用ConcurrentDictionary呢? - Jon
我相信ConcurrentDictionary是我正在寻找的,我的要求非常简单。主要的Parrallel.ForEach有大约20个项目,每个项目将是对Google Places API的不同调用,现在这些类型中的每一个都将返回最多200个结果,类型1、类型2可能会返回相同的项目,但是我只想通过将typeid附加到已经在集合中的项目来跟踪它,所以如果type1和type2带回相同的项目,应该只有一个项目,其中字符串“1 | 2”表示这两个项目都被发现了。 - Zoinky

11

这并不意味着Concurrent Bag不是线程安全的,它只是意味着你的代码不是线程安全的。

你正在检查Concurrent Bag中是否存在某个值,如果先前的检查失败,则添加一个新项。

然而因为你的代码中没有锁定机制,两个任务可能会同时执行以下操作:

THREAD 1        THREAD 2
=-=-=-=-=-=-=-=-=-=-=-=-
Check Exists
                Check Exists
Add New
                Add New

你需要锁住你的检查并添加例程。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接