如何在 BlockingCollection<T> 实例上执行可中断等待?

4
简单的场景 - 我想在 BlockingCollection<T> 上阻塞(如果它为空)。另一方面,我想在用于信号线程需要终止的 ManualResetEvent 对象上阻塞。
我知道没有办法使用 WaitHandle.WaitAny,因为在 BlockingCollection<T> 上阻塞会同时返回一个项目,并且不容易适应由 WaitHandle.WaitAny 所规定的 API。
我能想到的最简单的方法是在具有超时的 BlockingCollection<T> 上阻塞,然后在等待处理程序上等待 0 超时时间,类似于这样:
ManualResetEvent term = ...;
BlockingCollection<T> coll = ...;
while (true)
{
  T obj;
  bool found = coll.TryTake(out obj, 500);
  if (term.WaitOne(0))
  {
    break;
  }
  if (found)
  {
    // process the obj
  }
}

但我觉得应该有更优雅的解决方案,也许可以使用其他东西代替BlockingCollection<T>

欢迎提出任何想法。

编辑1

如果需要更优雅的解决方案,我可以放弃使用MRE并改用取消令牌。


您可以使用信号量(表示集合中的项数)实现自己的类似于 BlockingCollection<T> 的包装器,然后在信号量和手动重置事件上调用 WaitHandle.WaitAny - Cameron
1
你应该使用 CancellationToken 来向工作单元发出取消信号,而不是使用 MRE。 - Servy
我会看一下GetConsumingEnumerable,并像Servy建议的那样提供一个CancellationToken - Daniel Kelley
你可以在生产者中简单地调用CompleteAdding()。当最后一个项目被接收后,消费者在Take()调用时会抛出异常。 - Hans Passant
2个回答

3

最简单的选择是使用 CancellationToken(通过 CancellationTokenSource 发出信号)而不是使用 ManualResetEvent(如评论中所建议的),结合 GetConsumingEnumerable

var blockingCollection = ...
foreach (var obj in blockingCollection.GetConsumingEnumerable(cancellationToken))
{
    // process the obj
}

另一个选项是使用TPL DataflowBufferBlockAsyncManualResetEvent,这既可以使用类似于ManualResetEvent的结构,又可以异步"阻塞"而不浪费线程:

AsyncManualResetEvent term = ...
BufferBlock<T> buffer = ...
var termTask = term.WaitAsync();
while (true)
{
    var receiveTask = buffer.ReceiveAsync();
    if (termTask == await Task.WhenAny(receiveTask, termTask))
    {
        break;
    }

    T obj = await receiveTask;
    // process the obj
}

你可以根据Stephen Toub的文章:构建异步协作原语,第1部分:AsyncManualResetEvent编写自己的AsyncManualResetEvent,或者使用Visual Studio SDK中的一个:AsyncManualResetEvent

0

CancellationTokenSource 是你需要的;将其插入控制台应用程序并调用它。

private static void DoSomeWork()
{
    BlockingCollection<int> coll = new BlockingCollection<int>();
    CancellationTokenSource source = new CancellationTokenSource();
    ThreadPool.QueueUserWorkItem((s) =>
    {
        Console.WriteLine("Thread started. Waiting for item or cancellation.");
        try
        {
            var x = coll.Take(source.Token);
            Console.WriteLine("Take completed.");
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Take cancelled. IsCancellationRequested={0}", source.IsCancellationRequested);
        }
    });
    Console.WriteLine("Press ENTER to cancel wait.");
    Console.ReadLine();
    source.Cancel(false);
    Console.WriteLine("Cancel sent. Press Enter to exit.");
    Console.ReadLine();
}

这假设他能够使用此方法更改类型,从使用MRE更改为使用CancellationToken。这可能不是一个选项。 - Servy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接