如何访问阻塞集合的底层默认并发队列?

10

我有多个生产者和一个消费者。但是,如果队列中还有未被消费的内容,生产者就不应该将其重新排队。(使用默认并发队列的唯一、无重复项的阻塞集合)

if (!myBlockingColl.Contains(item))
    myBlockingColl.Add(item)

然而,阻塞集合没有 contains 方法,也没有提供任何类似 TryPeek() 的方法。我该如何访问底层的并发队列,以便执行以下操作:

if (!myBlockingColl.myConcurQ.trypeek(item)
  myBlockingColl.Add(item)

陷入了困境?


2
如果有这样的方法,那么它必须是一个单一的原子方法。它不能像你的两个摘录那样实现,因为另一个线程可能会在TryPeek和Add之间添加。我认为你没有运气了。你为什么需要这样的能力呢? - David Heffernan
在我的情况下,如果另一个线程在trypeek和add之间添加了内容,那也没关系。我主要是将其作为安全检查来执行的,其中调度程序在多个线程上触发报告生成触发器(因此有多个生产者)。如果调度程序由于任何原因出现故障并在短时间内多次触发相同的触发器,我只是想处理它。我知道我可以通过某种方式解决和处理它。看起来没有优雅的处理方式。谢谢 - Gullu
4个回答

9
这是一个有趣的问题。这是我第一次看到有人要求一个忽略重复项的阻塞队列。奇怪的是,我找不到已经存在于BCL中的类似你想要的东西。我说这很奇怪,因为BlockingCollection可以接受IProducerConsumerCollection作为底层集合,它具有广告中声称能够在检测到重复项时失败的TryAdd方法。问题是,我没有看到任何具体实现IProducerConsumerCollection以防止重复项。至少我们可以自己编写。
public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T>
{
  // TODO: You will need to fully implement IProducerConsumerCollection.

  private Queue<T> queue = new Queue<T>();

  public bool TryAdd(T item)
  {
    lock (queue)
    {
      if (!queue.Contains(item))
      {
        queue.Enqueue(item);
        return true;
      }
      return false;
    }
  }

  public bool TryTake(out T item)
  {
    lock (queue)
    {
      item = null;
      if (queue.Count > 0)
      {
        item = queue.Dequeue();
      }
      return item != null;
    }
  }
}

现在我们有了不接受重复项的 IProducerConsumerCollection,可以像这样使用它:
public class Example
{
  private BlockingCollection<object> queue = new BlockingCollection<object>(new NoDuplicatesConcurrentQueue<object>());

  public Example()
  {
    new Thread(Consume).Start();
  }

  public void Produce(object item)
  {
    bool unique = queue.TryAdd(item);
  }

  private void Consume()
  {
    while (true)
    {
      object item = queue.Take();
    }
  }
}

如果你对我的NoDuplicatesConcurrentQueue实现不满意,可以自己使用ConcurrentQueue或其他方式来实现,如果你认为需要TPL集合提供的低锁性能。

更新:

今天早上我测试了代码。好消息和坏消息都有。好消息是这个技术上可行。坏消息是你可能不想这样做,因为BlockingCollection.TryAdd截取了底层的IProducerConsumerCollection.TryAdd方法的返回值,并在检测到false时抛出异常。没错,就是这样。它不像你期望的那样返回false,而是生成一个异常。说实话,这既令人惊讶又荒谬。TryXXX方法的整个意义在于它们不应该抛出异常。我深感失望。


Brian,你是个牛人。让我消化一下,明天如果找到答案我会更新。谢谢。 - Gullu
整个IProducerConsumerCollection需要被实现的事实让我感到重新发明轮子的感觉。一定有更好的方法。我点赞了你的解决方案,因为我可能会做类似的事情。谢谢。 - Gullu
@Gullu:我刚刚更新了我的答案。无论如何,我认为你不会想使用这种策略。请阅读我的更新。 - Brian Gideon
1
在 MSDN 上,关于底层集合返回false的异常描述为:“如果该项是重复项,并且底层集合不接受重复项,则会引发InvalidOperationException异常。” 我认为这里的逻辑是,'Try'前缀指的是与BlockingCollection本身有关的情况。底层出现问题将导致异常。 - MajesticRa
警告:在.NET 7及以下版本中,BlockingCollection<T>存在一个错误,如果底层集合的TryAdd返回false,则会导致其有界容量的破坏。本答案中的代码会触发此错误。解决方法是抛出异常而不是返回false,如此处所示. - Theodor Zoulias
显示剩余2条评论

6
除了Brian Gideon在“更新”后提到的警告之外,他的解决方案存在以下性能问题:
  • 队列上的O(n)操作(queue.Contains(item))会严重影响性能,随着队列的增长
  • 锁限制并发(他确实提到了)
以下代码改进了Brian的解决方案:
  • 使用哈希集进行O(1)查找
  • 组合来自System.Collections.Concurrent命名空间的2个数据结构
注意:由于没有ConcurrentHashSet,我使用ConcurrentDictionary,忽略值。
在这种罕见情况下,幸运的是可以简单地将多个简单的并发数据结构组合成一个更复杂的数据结构,而不需要添加锁。这里操作2个并发数据结构的顺序很重要。
public class NoDuplicatesConcurrentQueue<T> : IProducerConsumerCollection<T>
{
    private readonly ConcurrentDictionary<T, bool> existingElements = new ConcurrentDictionary<T, bool>();
    private readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public bool TryAdd(T item)
    {
        if (existingElements.TryAdd(item, false))
        {
            queue.Enqueue(item);
            return true;
        }
        return false;
    }

    public bool TryTake(out T item)
    {
        if (queue.TryDequeue(out item))
        {
            bool _;
            existingElements.TryRemove(item, out _);
            return true;
        }
        return false;
    }
    ...
}

注意:从另一个角度看,你需要一个保留插入顺序的集合。

1
警告:在.NET 7及以下版本中,BlockingCollection<T>存在一个错误,如果底层集合的TryAdd返回false,则会导致其有界容量的破坏。本答案中的代码会触发此错误。解决方法是抛出异常而不是返回false,如此处所示. - Theodor Zoulias

1
我建议您使用锁来实现操作,以避免以破坏性的方式读写项目,使它们具有原子性。例如,对于任何IEnumerable:
object bcLocker = new object();

// ...

lock (bcLocker)
{
    bool foundTheItem = false;
    foreach (someClass nextItem in myBlockingColl)
    {
        if (nextItem.Equals(item))
        {
            foundTheItem = true;
            break;
        }
    }
    if (foundTheItem == false)
    {
        // Add here
    }
}

1
可以通过传统的加锁方式来实现。由于我正在使用新的net 4类,大部分情况下都可以在没有显式加锁的情况下正常工作,所以我希望有更酷的解决方法。谢谢。 - Gullu
没有更酷的方法 - 没有锁什么都不会起作用。即使是两个加法也可能会发生冲突。 - Ed Bayiates
每个可能改变集合的操作都需要使用此锁进行保护。 - David Heffernan

0
如何访问阻塞集合的默认并发队列?
默认情况下,BlockingCollection由ConcurrentQueue支持。换句话说,如果您不明确指定其后备存储,则它将在幕后创建一个ConcurrentQueue。由于您想直接访问底层存储,因此可以手动创建一个ConcurrentQueue并将其传递给BlockingCollection的构造函数:
ConcurrentQueue<Item> queue = new();
BlockingCollection<Item> collection = new(queue);

很不幸,ConcurrentQueue<T>集合没有带有输入参数的TryPeek方法,因此您打算做的事情是不可能的:

if (!queue.TryPeek(item)) // Compile error, missing out keyword
    collection.Add(item);

请注意,queue现在归collection所有。如果您尝试直接对其进行变异(通过发出EnqueueTryDequeue命令),则collection将抛出异常。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接