BufferBlock在TryReceiveAll后使用OutputAvailableAsync时出现死锁

23

在回答这个问题期间,我编写了以下代码段:an answer

var buffer = new BufferBlock<object>();
var producer = Task.Run(async () =>
{
    while (true)
    {
        await Task.Delay(TimeSpan.FromMilliseconds(100));
        buffer.Post(null);
        Console.WriteLine("Post " + buffer.Count);
    }
});
var consumer = Task.Run(async () =>
{
    while (await buffer.OutputAvailableAsync())
    {
        IList<object> items;
        buffer.TryReceiveAll(out items);
        Console.WriteLine("TryReceiveAll " + buffer.Count);
    }
});
await Task.WhenAll(consumer, producer);
生产者应该每隔100毫秒将物品发布到缓冲区,消费者应该清除缓冲区中的所有物品,并异步等待更多物品出现。
实际发生的情况是,生产者只清除所有物品一次,然后再也没有移动过 `OutputAvailableAsync`。如果我将消费者切换为逐个删除物品,则它可以按预期工作:
while (await buffer.OutputAvailableAsync())
{
    object item;
    while (buffer.TryReceive(out item)) ;
}

我是否有什么误解?如果没有,问题出在哪里?


10
在我看来,这似乎是一个错误。你能想到的最仁慈的解释是,只有在生产者调用Complete()时才应该调用它。当然,这几乎没有任何意义 :) 我认为这个bug是由于TryReceiveAll()中缺少对OfferAsyncIfNecessary()的调用造成的,在TryReceive()中存在但在TryReceiveAll()中却不见了。而且,消息编号计数看起来也很混乱。请将此发布到connect.microsoft.com。 - Hans Passant
1
@HansPassant 完成 - i3arnon
你尝试过将while条件中的await替换为true,并在循环中插入await吗? - guiomie
@guiomie 不行。因为那会创建一个无限循环。 - i3arnon
2个回答

11

这是一个由BufferBlock内部使用的SourceCore中的错误。它的TryReceiveAll方法没有像TryReceive那样打开_enableOffering布尔数据成员。这导致从OutputAvailableAsync返回的任务永远无法完成。

以下是最小复现代码:

var buffer = new BufferBlock<object>();
buffer.Post(null);

IList<object> items;
buffer.TryReceiveAll(out items);

var outputAvailableAsync = buffer.OutputAvailableAsync();
buffer.Post(null);

await outputAvailableAsync; // Never completes

我刚刚在.Net Core存储库中使用此拉取请求修复了它。希望这个修复很快出现在NuGet包中。


2
哎呀,现在已经是2015年9月底了。虽然i3arnon修复了这个错误,但是在错误被修复的两天后发布的版本中仍未解决:Microsoft TPL Dataflow版本4.5.24。
不过,IReceivableSourceBlock.TryReceive(...)函数正常工作。编写一个扩展方法可以解决这个问题。在TPL Dataflow发布新版本后,更改扩展方法将很容易。
/// <summary>
/// This extension method returns all available items in the IReceivableSourceBlock
/// or an empty sequence if nothing is available. The functin does not wait.
/// </summary>
/// <typeparam name="T">The type of items stored in the IReceivableSourceBlock</typeparam>
/// <param name="buffer">the source where the items should be extracted from </param>
/// <returns>The IList with the received items. Empty if no items were available</returns>
public static IList<T> TryReceiveAllEx<T>(this IReceivableSourceBlock<T> buffer)
{
    /* Microsoft TPL Dataflow version 4.5.24 contains a bug in TryReceiveAll
     * Hence this function uses TryReceive until nothing is available anymore
     * */
    IList<T> receivedItems = new List<T>();
    T receivedItem = default(T);
    while (buffer.TryReceive<T>(out receivedItem))
    {
        receivedItems.Add(receivedItem);
    }
    return receivedItems;
}

用法:
while (await this.bufferBlock.OutputAvailableAsync())
{
    // some data available
    var receivedItems = this.bufferBlock.TryReceiveAllEx();
    if (receivedItems.Any())
    {
        ProcessReceivedItems(bufferBlock);
    }
}

我认为在面对多个消费者时,TryReceiveAllEx()和TryReceiveAll()的行为会有所不同。我猜想TryReceiveAll()会获取一个单一连续块,而ex方法则会获取基本上是散射抽样的数据,虽然两种方法都会使该块为空。 - bunt
也许你对于多个监听器的不同行为是正确的。当前版本确实存在差异,因为TryReceiveAll根本不起作用,对于任何人都是如此。如果你使用这种方法,在错误被修复时需要进行的更改将是最小的,只需在这个方法上进行修改即可。 - Harald Coppoolse

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接