TPL Dataflow和下游块中的异常处理

5
我有以下伪代码:
var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5 });
var a = new ActionBlock<int>(async item =>
    {
        await Task.Delay(500);
        Trace.TraceInformation(
            $"Target 1: | Type: {typeof(int).Name} | Thread: {Thread.CurrentThread.ManagedThreadId} | Message: {item}");
        // handling some logic but it throws
        if (item >= 5) throw new Exception("Something bad happened");

    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1, MaxDegreeOfParallelism = 1 });

queue.LinkTo(a, new DataflowLinkOptions { PropagateCompletion = true });

var targets = new List<ITargetBlock<int>> {queue};

var broadcaster = new ActionBlock<int>(
    async item =>
    {
        var processingTasks = targets.Select(async t =>
        {
            try
            {
                // This is condition is always false
                // t (bufferblock) has no exceptions. Exception is raised in downstream action block where it sends to
                if (!await t.SendAsync(item))
                    await t.Completion;
            }
            catch (Exception e)
            {
                Trace.TraceInformation("Handled exception : " + e.Message);
            }
        });

        try
        {
            // Neither here the exception is rethrowed
            await Task.WhenAll(processingTasks);
        }
        catch (Exception e)
        {
            Trace.TraceInformation("Handled exception WhenAll : " + e.Message);
        }
    });

for (var i = 1; i <= 10; i++)
{
    broadcaster.Post(i);
}

管道被配置为这样 ActionBlock<int> => BufferBlock<int> => ActionBlock<int>
最后一个 ActionBlock<int> 抛出异常,但它没有重新抛出到源块,我想在那里处理它。
如何重写此代码以正确处理异常?

你无法处理其他块中的异常。当你向 tpl 块 PostSendAsync 消息时,它只会将其添加到缓冲区中,这就是全部,你既没有对它的引用,也没有对抛出的异常的引用。 - VMAtm
如果t是另一个引发异常的操作块,它确实可以工作。我的问题更多地涉及如何设计具有良好异常处理的管道,而不是在许多地方散布try catch。 - Tomasz Jaskuλa
1个回答

9
您可以在此处找到有关此主题的官方指南完整解决方案是订阅所有块的Completion任务,并检查其状态,必要时替换故障块(应该也存储所有块的引用)。有关更多信息,请参阅整个文章。

带“Faulted”块的网络行为

  1. 保留消息
    为避免消息损坏,故障块应尽快清除其消息队列并进入“Faulted”状态。 有一种情况不遵守这个规则:源块持有目标保留的消息。 如果一个遇到内部异常的块有一个被目标保留的消息,则不能丢弃保留消息, 并且在释放或使用消息之前,不应将块移动到“Faulted”状态。

  2. 挂起的网络
    ...

    • 保留对网络中所有块的引用,并使用Task.WaitAllTask.WhenAll等待它们(同步或异步)。如果块发生故障,则其Completion任务将以“Faulted”状态完成。
    • 在构建线性网络时,使用带有PropagateCompletion == trueDataflowLinkOptions。这将从源传播块完成到目标。在这种情况下,只需等待网络叶块即可。

“网络叶块”是什么意思?这是你的数据流管道的最后一个块吗? - PastExpiry.com
1
@user2972161 这不是我说的话,而是官方文档中的引用。Leaf 意味着图形中的块没有任何子块,因此,是序列中的最后一个块(但不一定是最后一个 - 这取决于您的管道)。 - VMAtm

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接