C#如何在BlockingCollection<T>中更新元素?

3

我有一个ConcurrentDictionaryWrapper类,它包装了ConcurrentDictionary<K,V>并实现了IProducerConsumerCollection接口,这样我就可以将其与BlockingCollection一起使用。生产者将使用一个键将值添加到BlockingCollection中。思路是,如果键存在,则替换底层字典中的值。我的ConcurrentDictionaryWrapper.TryAdd()方法如下:

public bool TryAdd(KeyValuePair<TKey, TValue> item)
{
    _wrapped[item.Key] = item.Value
    return true;
}

我看到的问题是,如果值被替换,BlockingCollection将视其为添加操作。

var wrapper = new ConcurrentDictionaryWrapper<string, object>();
var bc = new BlockingCollection<KeyValuePair<string, object>>(wrapper);
bc.TryAdd(new KeyValuePair<string, object>("key", new object()));
bc.TryAdd(new KeyValuePair<string, object>("key", new object()));
wrapper.Count; # returns 1
bc.Count; # returns 2

TryAdd()中我无法返回false,因为BlockingCollection会引发InvalidOperationException异常。
有没有办法实现我想要的行为呢?我希望这尽可能简单,但似乎没有一种方法可以通过仅实现IProducerConsumerCollection来实现具有“添加或更新”行为的BlockingCollection。我想避免在调用BlockingCollection上的TryAdd()之前进行TryTake() - 我可能会在Dictionary周围使用标准锁并使用AutoResetEvent同步生产者/消费者。
2个回答

1
也许,与其替换,将先前的项目标记为无效可能更合适。每个值都保留一个布尔标志。当添加具有现有键的项目时,查找先前的项目,将其标志设置为false,并添加带有标志设置为true的新项目。
然后,当消费者获取项目时,它会检查其是否有效。如果无效,它将丢弃并请求新的。

这样做是可行的,但我猜使用简单锁来访问数据和事件原语来同步线程会更有效率? - kenkam

1
这里存在相当大的摩擦,BlockingCollection<> 保持自己的计数,并不使用 IProducerConsumerCollection.Count 实现。因此,由于您没有失败 TryAdd(),BlockingCollection 现在确信集合中有两个项。即使您只能检索其中一个。所以你肯定不能让它保持原样,否则你的代码将永远死锁。

你需要的是一个 BlockingCollection 方法,如果该项已经存在,则返回false或静默失败。但它没有这个方法,InvalidOperationException 是不可避免的。你可以捕捉它,但那太丑陋了。

另一种方法是使用 Contains() 方法,如果它返回true,则避免调用 TryAdd()。但现在集合不再是线程安全的。这就是为什么 BlockingCollection 没有该方法。

这是一个进退两难的局面,你不能按照设想让它工作。你必须自己编写代码。最好偷窥一下线程大师的代码,这篇杂志文章中的有界缓冲区应该是一个很好的开始,以获得正确的锁设计。


谢谢你的回答,这证实了我的怀疑。我被 https://dev59.com/sVbUa4cB1Zd3GeqPBbkj 误导了--我认为那里的例子之所以有效是因为没有任何项目被“更新”,因为它们都在被主线程消耗之前就已经被消耗完了。 - kenkam

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接