TPL Dataflow有界容量转换块中的异常

12

我需要构建TPL数据流管道来处理大量的消息。因为有很多消息,我不能简单地将它们PostBufferBlock的无限队列中,否则我将面临内存问题。所以我想使用BoundedCapacity = 1选项禁用队列,并使用MaxDegreeOfParallelism来使用并行任务处理,因为我的TransformBlock对每个消息可能需要一些时间。我还使用PropagateCompletion使所有完成和失败都能向下传播到管道。

但是当错误发生在第一个消息之后时,我遇到了错误处理问题:调用await SendAsync会将我的应用程序切换到无限等待状态。

我已经将我的情况简化为示例控制台应用程序:

var data_buffer = new BufferBlock<int>(new DataflowBlockOptions
{
    BoundedCapacity = 1
});

var process_block = new ActionBlock<int>(x =>
{
    throw new InvalidOperationException();
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 2,
    BoundedCapacity = 1
});

data_buffer.LinkTo(process_block,
    new DataflowLinkOptions { PropagateCompletion = true });

for (var k = 1; k <= 5; k++)
{
    await data_buffer.SendAsync(k);
    Console.WriteLine("Send: {0}", k);
}

data_buffer.Complete();

await process_block.Completion;

顺便提一下,如果使用BoundedCapacity配置块的容量小于MaxDegreeOfParallelism,则会将并行度降低到容量值。换句话说,如果只允许缓冲一个项目,则该块无法同时处理两个项目。我认为这是因为在处理两个项目后,它应该将两个结果存储在其输出缓冲区中,但没有足够的空间来存储两个结果。 - Theodor Zoulias
可能是吧,但对我来说至少不太直观。我以为"缓冲区"是指所有溢出的东西。因此,如果我们有2个工人和1个缓冲区容量,它会获取2个项目并将它们分配给每个工人,然后再取1个项目 "提前"。 - Michael Logutov
关于 ActionBlock,是的,这是有道理的,因为该块只有一个输入队列而没有输出。但实际上,即使是 ActionBlock 也受到相同的规则控制,原因可能是为了保持一致性。 - Theodor Zoulias
2个回答

10
这是预期的行为。如果“下游”出现故障,则错误不会向网格“向后”传播。网格希望您检测到该故障(例如,通过process_block.Completion),并解决它。
如果您想向后传播错误,可以在process_block.Completion上使用await或继续操作,如果下游块出现故障,则会使上游块出现故障
请注意,这不是唯一的解决方案;您可能需要重建该部分网格或将源链接到替代目标。源块未出现故障,因此它们可以继续使用修复后的网格进行处理。

那么,我应该如何等待BufferBlock再次可用以发送数据?在数据排队的末尾,我应该等待什么? - Michael Logutov
1
有道理。我的意思是,如果任何一个块出现故障,整个流水线都会停止,并且异常会冒泡到主线程。 - Michael Logutov
4
它可以传播BufferBlock中的任何故障到ActionBlock,但数据、完成或故障都不会在链接中向传播。 - Stephen Cleary
4
@pnewhook说:我更倾向于在出现故障时拆除整个网格(我想大多数人都是这样的),但我们不一定非得那样做。如果一个块没有传播完成并发生故障,那么你可以将其与其他块取消链接并放入替换块(而其余的网格仍然在运行)。 - Stephen Cleary
1
请注意,如果您等待的不仅是 process_block 块,还包括 data_buffer 块,则可能会出现死锁,因为只有在处理完现有项目后,data_buffer 块才会完成。而且,由于 process_block 的容量有限,它可能永远无法完成。 - stil
显示剩余4条评论

0

LinkTo方法使用PropagateCompletion配置,将源块的完成状态传递到目标块。因此,如果源块失败,失败会传播到目标块,最终两个块都将完成。但是,如果目标块失败,则源块不会收到通知,并且将继续接受和处理消息。如果我们在混合中添加BoundedCapacity配置,则源块的内部输出缓冲区很快就会变满,阻止它接受更多消息。正如您所发现的那样,这很容易导致死锁。

为了防止死锁的发生,最简单的方法是确保管道中任何一个块出现错误都会尽快完成其所有组成块。其他方法也是可能的,正如 Stephen Cleary 的 答案 所示,但在大多数情况下,我希望采用快速失败的方法来实现期望的行为。令人惊讶的是,这种简单的行为并不容易实现。没有内置机制可供此目的使用,手动实现也很棘手。
自.NET 6起,強制完成數據流水線中的一個塊的唯一可靠方法是Fault該塊,並通過將其鏈接到NullTarget來丟棄其輸出緩沖區。僅單獨 Faulting 該塊,或通過 CancellationToken 選項取消該塊不足夠。有些情況下,失敗或取消的塊將無法完成。這裡演示了第一種情況(失敗且未完成),這裡演示了第二種情況(取消且未完成)。這兩種情況都要求該塊已被標記為已完成,這可能發生在所有參與數據流水線且配置為使用 PropagateCompletion 的塊上自動且不確定地。存在報告此問題行為的 GitHub 帖子:No way to cancel completing dataflow blocks。截至本文撰寫時,開發人員尚未提供任何反饋。

有了这个知识,我们可以实现一个类似于 LinkTo 的方法,它可以创建像这样的快速失败管道:

/// <summary>
/// Connects two blocks that belong in a simple, straightforward,
/// one-way dataflow pipeline.
/// Completion is propagated in both directions.
/// Failure of the target block causes purging of all buffered messages
/// in the source block, allowing the timely completion of both blocks.
/// </summary>
/// <remarks>
/// This method should be used only if the two blocks participate in an exclusive
/// producer-consumer relationship.
/// The source block should be the only producer for the target block, and
/// the target block should be the only consumer of the source block.
/// </remarks>
public static void ConnectTo<TOutput>(this ISourceBlock<TOutput> source,
    ITargetBlock<TOutput> target)
{
    source.LinkTo(target, new DataflowLinkOptions { PropagateCompletion = true });
    ThreadPool.QueueUserWorkItem(async _ =>
    {
        try { await target.Completion.ConfigureAwait(false); } catch { }
        if (!target.Completion.IsFaulted) return;
        if (source.Completion.IsCompleted) return;
        source.Fault(new Exception("Pipeline error."));
        source.LinkTo(DataflowBlock.NullTarget<TOutput>()); // Discard all output
    });
}

使用示例:

var data_buffer = new BufferBlock<int>(new() { BoundedCapacity = 1 });

var process_block = new ActionBlock<int>(
    x => throw new InvalidOperationException(),
    new() { BoundedCapacity = 2, MaxDegreeOfParallelism = 2 });

data_buffer.ConnectTo(process_block); // Instead of LinkTo

foreach (var k in Enumerable.Range(1, 5))
    if (!await data_buffer.SendAsync(k)) break;

data_buffer.Complete();
await process_block.Completion;

你也可以考虑在等待最后一个管道块之前(或者在finally区域之后)等待所有管道组成部分。这样做的好处在于,如果发生故障,您就不会冒风险泄漏在后台运行但未被观察到的“fire-and-forget”操作,直到管道的下一个重生:

try { await Task.WhenAll(data_buffer.Completion, process_block.Completion); } catch { }

您可以忽略由await Task.WhenAll操作引发的所有可能出现的错误,因为等待最后一个块将传达大部分与错误相关的信息。在下游块失败后,您可能会错过上游块中发生的其他错误。如果您想要观察所有错误,那么这将是棘手的,因为错误是向下传播的:您可能会多次观察到相同的错误。如果您想要勤奋地记录每个错误,那么最好在处理块的lambda内部进行日志记录,而不是依赖它们的Completion属性。

缺点:上面的ConnectTo实现以一块一块地向后传播故障。传播不是即时的,因为故障块在任何当前正在处理的消息的处理完成之前不会完成。如果管道很长(5-6块或更多),并且每个块的工作量很大,则这可能是一个问题。这种额外的延迟不仅浪费时间,而且还浪费资源,因为这些工作最终都会被丢弃。

我已经上传了一个更复杂的版本的ConnectTo想法到this GitHub存储库中。它解决了前一段提到的延迟完成问题:任何块中的故障都会立即传播到所有块。作为奖励,它还将管道中的所有错误作为平面AggregateException传播。


注意:此答案已经完全重写。原始答案(修订版4)包含了一些错误的想法,以及ConnectTo方法的有缺陷的实现。


最近我意识到这个模式是错误的。使用CancellationTokenSource或该模式并没有什么问题。数据流不是什么新鲜事物,也不仅限于.NET平台。有些特定的模式在各种语言和平台上都会重复出现,即使它们没有被包含在命名空间的文档中。即使TPL Dataflow也不是新的,它已经存在了将近10年。它从来不需要使用异常作为控制流来传播完成或取消。 - Panagiotis Kanavos
"async void" 只是在招惹问题。一个简单的 "source.Completion.ContinueWith(_=>target.Complete());" 就可以完成同样的工作,而不会冒着 ObjectDisposedException 的风险或者在异步状态机内无缘无故地抛出异常的代价。 - Panagiotis Kanavos
@PanagiotisKanavos,我认为我已经很详细地解释了在管道中只等待最后一个块时出现的问题。 TPL Dataflow的年代以及其他语言和平台正在做什么都不重要。错误的模式不会因为它变老而变得正确。关于原始ContinueWith方法,我轻松地拒绝了它,转而采用负责任且不泄漏的async void方法。如果我的ConnectTo方法有缺陷,我希望这个错误能够被公开揭示,而不是在黑暗中被忽略。 - Theodor Zoulias
catch { } ?? .NullTarget<TOutput>() ? That's the definition of swallowed in the dark - Panagiotis Kanavos
在代码中使用空的“catch”块来吞掉错误是有意为之的。这与“ConnectTo”的工作原理无关,因为该方法不拥有源块和目标块的错误处理权。但是,“ConnectTo”对于“target.Complete();”行却有所有权。由于无法以合理的方式处理此行的失败,所以允许将其作为未处理的异常传播。在我看来,这是正确的做法。将此行包装在一个空的try-catch中是不负责任的。这本质上就是“ContinueWith”正在做的事情(如果您让返回的“Task”泄漏)。 - Theodor Zoulias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接