Dataflow管道中的全局分块错误处理

12

我正在设计一个长时间运行的数据流水线,由多个块组成。项被送到管道的输入块中,最终通过它并在UI中显示(作为对用户的礼貌,但管道的真正工作是将处理结果保存到磁盘中)。

管道块内的lambda函数可能会因各种原因(错误的输入、网络故障、计算过程中出错等)而抛出异常。在这种情况下,我希望将有问题的项排除,并在UI下方的“错误”部分显示它。

那么最好的方法是什么?我知道可以在每个单独的lambda函数中使用try/catch来实现:

var errorLoggingBlock = new ActionBlock<Tuple<WorkItem, Exception>>(...)

var workerBlock = new TransformBlock<WorkItem, WorkItem>(item => 
{
    try {
        return DoStuff(item);
    } catch (Exception ex) {
        errorLoggingBlock.SendAsync(Tuple.Create(item, ex));
        return null;
    }
}

但是我有大约10个块在管道中,把那段代码复制/粘贴到每一个块中似乎很愚蠢。而且,我不喜欢返回null的想法,因为现在所有下游块都将不得不检查它。

我的下一个最好的想法是创建一个函数,返回一个lambda表达式来替代我进行包装:

  private Func<TArg, TResult> HandleErrors<TArg, TResult>(Func<TArg, TResult> f) where TArg:WorkItem
  {
     return arg =>
     {
        try {
           return f(arg);
        } catch (Exception ex) {
           errorLoggingBlock.SendAsync(Tuple.Create(item, ex));
           return default(TResult);
        }
     };
  }

但这似乎有点太元了。有更好的方法吗?


你可以看一下 Stephen Cleary 的极简 Try 库。它允许将消息通过管道的所有块,并在最后观察到此消息发生的任何异常。 - Theodor Zoulias
1个回答

4

这是一个非常有趣的主题。

在连接块时,您可以定义过滤器,这意味着您可以将错误结果重定向到错误处理块。为此,块应该返回包含其处理结果和至少一个失败/成功指示器的“元”对象。

这个想法在铁路导向编程中更好地描述,其中链中的每个函数都会处理成功的结果或将失败的结果重定向到“失败轨迹”以进行日志记录。

实际上,这意味着您应该在每个块之后添加两个链接:一个带有过滤条件,将其重定向到错误处理块,另一个默认链接将进入流程中的下一步。

甚至可以将两个思路结合起来处理部分故障。部分故障的结果将包含失败指标和有效负载。您可以将结果重定向到日志记录块,然后再将其传递给下一步骤。

我发现要明确每条消息的状态要容易得多,而不是尝试通过检查null、丢失的值等来确定其状态。这意味着块应该将它们的结果包装在“信封”对象中,其中包含状态标志、结果和/或任何错误。


谢谢,我想我理解了这个概念,但这并没有回答我的问题。我该如何自动确保所有块在其处理函数抛出错误时都发出错误信号项?正如我在问题中所说的,我可以手动将它们全部包装在try/catch中,但肯定有更好的方法... - Bugmaster

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接