LinkTo
方法使用PropagateCompletion
配置,将源块的完成状态传递到目标块。因此,如果源块失败,失败会传播到目标块,最终两个块都将完成。但是,如果目标块失败,则源块不会收到通知,并且将继续接受和处理消息。如果我们在混合中添加BoundedCapacity
配置,则源块的内部输出缓冲区很快就会变满,阻止它接受更多消息。正如您所发现的那样,这很容易导致死锁。
为了防止死锁的发生,最简单的方法是确保管道中任何一个块出现错误都会尽快完成其所有组成块。其他方法也是可能的,正如 Stephen Cleary 的
答案 所示,但在大多数情况下,我希望采用快速失败的方法来实现期望的行为。令人惊讶的是,这种简单的行为并不容易实现。没有内置机制可供此目的使用,手动实现也很棘手。
自.NET 6起,強制完成數據流水線中的一個塊的唯一可靠方法是
Fault
該塊,並通過將其鏈接到
NullTarget
來丟棄其輸出緩沖區。僅單獨 Faulting 該塊,或通過
CancellationToken
選項取消該塊不足夠。有些情況下,失敗或取消的塊將無法完成。
這裡演示了第一種情況(失敗且未完成),
這裡演示了第二種情況(取消且未完成)。這兩種情況都要求該塊已被標記為已完成,這可能發生在所有參與數據流水線且配置為使用
PropagateCompletion
的塊上自動且不確定地。存在報告此問題行為的 GitHub 帖子:
No way to cancel completing dataflow blocks。截至本文撰寫時,開發人員尚未提供任何反饋。
有了这个知识,我们可以实现一个类似于 LinkTo
的方法,它可以创建像这样的快速失败管道:
public static void ConnectTo<TOutput>(this ISourceBlock<TOutput> source,
ITargetBlock<TOutput> target)
{
source.LinkTo(target, new DataflowLinkOptions { PropagateCompletion = true });
ThreadPool.QueueUserWorkItem(async _ =>
{
try { await target.Completion.ConfigureAwait(false); } catch { }
if (!target.Completion.IsFaulted) return;
if (source.Completion.IsCompleted) return;
source.Fault(new Exception("Pipeline error."));
source.LinkTo(DataflowBlock.NullTarget<TOutput>());
});
}
使用示例:
var data_buffer = new BufferBlock<int>(new() { BoundedCapacity = 1 });
var process_block = new ActionBlock<int>(
x => throw new InvalidOperationException(),
new() { BoundedCapacity = 2, MaxDegreeOfParallelism = 2 });
data_buffer.ConnectTo(process_block); // Instead of LinkTo
foreach (var k in Enumerable.Range(1, 5))
if (!await data_buffer.SendAsync(k)) break;
data_buffer.Complete();
await process_block.Completion;
你也可以考虑在等待最后一个管道块之前(或者在finally
区域之后)等待所有管道组成部分。这样做的好处在于,如果发生故障,您就不会冒风险泄漏在后台运行但未被观察到的“fire-and-forget”操作,直到管道的下一个重生:
try { await Task.WhenAll(data_buffer.Completion, process_block.Completion); } catch { }
您可以忽略由await Task.WhenAll
操作引发的所有可能出现的错误,因为等待最后一个块将传达大部分与错误相关的信息。在下游块失败后,您可能会错过上游块中发生的其他错误。如果您想要观察所有错误,那么这将是棘手的,因为错误是向下传播的:您可能会多次观察到相同的错误。如果您想要勤奋地记录每个错误,那么最好在处理块的lambda内部进行日志记录,而不是依赖它们的Completion
属性。
缺点:上面的ConnectTo
实现以一块一块地向后传播故障。传播不是即时的,因为故障块在任何当前正在处理的消息的处理完成之前不会完成。如果管道很长(5-6块或更多),并且每个块的工作量很大,则这可能是一个问题。这种额外的延迟不仅浪费时间,而且还浪费资源,因为这些工作最终都会被丢弃。
我已经上传了一个更复杂的版本的ConnectTo
想法到this GitHub存储库中。它解决了前一段提到的延迟完成问题:任何块中的故障都会立即传播到所有块。作为奖励,它还将管道中的所有错误作为平面AggregateException
传播。
注意:此答案已经完全重写。原始答案(修订版4)包含了一些错误的想法,以及ConnectTo
方法的有缺陷的实现。
BoundedCapacity
配置块的容量小于MaxDegreeOfParallelism
,则会将并行度降低到容量值。换句话说,如果只允许缓冲一个项目,则该块无法同时处理两个项目。我认为这是因为在处理两个项目后,它应该将两个结果存储在其输出缓冲区中,但没有足够的空间来存储两个结果。 - Theodor ZouliasActionBlock
,是的,这是有道理的,因为该块只有一个输入队列而没有输出。但实际上,即使是ActionBlock
也受到相同的规则控制,原因可能是为了保持一致性。 - Theodor Zoulias