如何在第一次异常发生时取消整个TPL数据流?

3
考虑以下示例:
        ActionBlock<TimeSpan> ab = new ActionBlock<TimeSpan>(async _ =>
        {
            await Task.Delay(_);
            throw new Exception();
        }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = Int32.MaxValue });

        ab.Post(TimeSpan.FromSeconds(10d));
        ab.Post(TimeSpan.FromDays(1d));

        await ab.Completion;

正如预期的那样,我在输出窗口中看到在10秒后抛出了异常,但这并不会导致dataFlow完成(在await ab.Completion之后的断点将在1天之前被击中)。
在我的情况下,如果一个计算步骤出现异常,我希望取消整个dataFlow。
我无法看到使用tpl dataflow如何实现这一点...有什么建议吗?
谢谢...
【编辑】正如Ofir提到的,我可以这样做:
        ActionBlock<TimeSpan> ab = new ActionBlock<TimeSpan>(async _ =>
        {
            try
            {
                await Task.Delay(_);
                throw new Exception();// Or any other Task thay may throw an exception.
            }
            catch(Exception)
            {
                cancelTokenSource.Cancel();
                throw;
            }
        }, new ExecutionDataflowBlockOptions() {CancellationToken=cancelTokenSource.Token, MaxDegreeOfParallelism = Int32.MaxValue });

        ab.Post(TimeSpan.FromSeconds(10d));
        ab.Post(TimeSpan.FromDays(1d));

        await ab.Completion;

但我希望避免的正是这个:一遍又一遍地输入try{}catch... 或者更糟糕的是:忘记它... ;)

下一步可以做的是编写ActionBlock构造函数的替代方法来处理它(它会接受一个额外的CancellationTokenSource参数)... 我惊讶于这不是直接与数据流一起提供的... 真的吗?

[最终编辑] 似乎在tpd dataFlow中没有这样的东西,而接受CancellationTokenSource作为参数的ActionBlock“扩展构造函数”(实际上是静态方法)将是可能的解决方法...


与并行循环和PLINQ查询一样,这种并行处理中的异常不会强制中断其他并发处理,也不会在异常发生时被认为构造的处理已完成;如果发生异常,数据流块只有在所有处理都静止后才会完成。 - Stephen Cleary
谢谢...我明白每种用法都应该被支持,但是由于“在第一个意外错误时取消所有操作”是一个常见的场景,所以我想知道是否错过了某个数据流选项...那我就写我的“ActionBlock扩展构造函数”吧... ;) - Sylvain
1个回答

5
您抛出的Exception会使ActionBlock进入故障状态并且会删除所有已缓冲的消息,并且不再接受任何消息。
同样适用于CancelationToken(可在ExecutionDataflowBlockOptions中提供)。

一旦当前处理的消息完成,您将在await ab.Completion处收到AggregateException

Task一样,您必须自己处理已执行消息的中止。

例如,在您提供的示例中,可以通过以下方式实现:

var cancellationTokenSource = new CancellationTokenSource();
var ab = new ActionBlock<TimeSpan>(async _  =>
{
    // await with cancellation token
    await Task.Delay(_, cancellationTokenSource.Token);
    cancellationTokenSource.Cancel();
}, new ExecutionDataflowBlockOptions {CancellationToken = cancellationTokenSource.Token, MaxDegreeOfParallelism = int.MaxValue});

ab.Post(TimeSpan.FromSeconds(10));
ab.Post(TimeSpan.FromSeconds(20));
Thread.Sleep(15000);
cancellationTokenSource.Cancel();
ab.Post(TimeSpan.FromSeconds(100));

try { await ab.Completion; }
catch(TaskCancelationException ex)
{ }

在上述情况下,我们发布了2条消息,这些消息将立即运行。
10秒后,第一条消息将导致Cancel cancellationTokenSource并使得另一条消息(延迟20秒)立即完成,并使ActionBlock进入取消状态。 我们尝试发布的下一条消息不会被接受或执行。
15秒后,当等待completion时,我们将收到一个TaskCancellationException

谢谢。这确实符合我对tpl数据流的理解,但我希望有一个更集成的解决方案。我想要对未处理的异常做出反应...当然,我可以将所有操作包装在try{...}catch(){cancellationTokenSource.Cancel();throw;}中,但每次都写这个并不是很好...我希望tpl数据流有一个更干净的解决方案...但也许没有... ;) - Sylvain
1
@Sylvain,你可以为你的ActionBlock实现一个带有try/catch块的简单包装器,并在整个应用程序中使用它。这是一种类似于面向方面编程的方法。 - VMAtm
是的,就是这个精神...看起来确实没有包含在数据流组件中...我会编辑我的问题并回答“没有这样的东西”...再次感谢,Sylvain。 - Sylvain

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接