TPL数据流块取消的正确方法

4
我正在使用TPL块来执行可能被用户取消的操作: 我想到了两个选项,第一个选项是取消整个块,但不取消块内的操作。例如:
_downloadCts = new CancellationTokenSource();

var processBlockV1 = new TransformBlock<int, List<int>>(construct =>
{
    List<int> properties = GetPropertiesMethod(construct );
    var entities = properties
        .AsParallel()
        .Select(DoSometheningWithData)
        .ToList();
    return entities;
}, new ExecutionDataflowBlockOptions() { CancellationToken = _downloadCts.Token });

我取消了内部操作,但没有取消块本身:

var processBlockV2 = new TransformBlock<int, List<int>>(construct =>
{
    List<int> properties = GetPropertiesMethod(construct);
    var entities = properties
        .AsParallel().WithCancellation(_downloadCts.Token)
        .Select(DoSometheningWithData)
        .ToList();
    return entities;
});

据我理解,第一种选项会取消整个块,从而关闭整个流水线。我的问题是:如果有任何资源(例如打开的StreamReaders等),它是否也会取消内部操作并处理所有资源,还是最好选择第二个选项,然后我自己可以确保取消和清理所有内容,然后我可以使用某些手段(铁路编程)来将抛出的 OperationCanceledException 向下传递管道,并在我想要的地方对其进行处理?

2
你应该将你的取消令牌传递给所有的方法。 - sommmen
1
“Cancel”只是表示您希望事情结束。除非有东西检查这个标志,否则它不会结束。这意味着DoSometheningWithData()也应该检查_downloadCts,可能使用_downloadCts.ThrowIfCancellationRequested()。如果您正在处理可处置资源,请将它们放在using()块中以确保它们被丢弃。 - user585968
TPL 的真正威力在于从几个不同的构建块中构建管道。因此,它被设计成这样一种方式:每当 ISourceBlock 结束(无论出于什么原因),它就可以通知所有链接的 ITargetBlock。为了使其像这样工作,您必须在 Link 调用期间指定以下内容:new DataflowLinkOptions { PropagateCompletion = true } 我的建议是考虑取消 生产者 部分,而不是 消费者 - Peter Csala
1个回答

3
这两个选项并不相等。
  1. 第一种选择(CancellationToken = _downloadCts.Token)将使processBlockV1块丢弃其缓冲区中当前存在的任何消息(其InputCount属性将变为0),并停止接受新消息(调用其Post方法将无法返回false)。但它不会停止正在处理的消息。这些消息将被完全处理,但不会传播到任何链接的块。在这些消息处理完成后,块将以取消状态完成(其Completion.Status属性将变为Canceled)。

  2. 第二种选择(取消内部操作)对整个块没有影响。数据流块容忍从其处理函数抛出的任何OperationCanceledException,并忽略故障项并继续下一个。因此,在令牌取消后,所有发布的消息仍将被处理,并且块将继续接受更多消息。它只是不会将任何内容传播到其链接的块,因为所有项都会抛出OperationCanceledException并被忽略。在特定示例中,GetPropertiesMethod方法将为所有construct消息调用,因此会延迟块的完成。块的最终状态将为RanToCompletion

重要的是要知道,Dataflow块严格遵守“完成”概念。在报告完成之前,它们将等待所有已知的任务停止运行。如果您确实希望它们过早地完成并留下仍在运行的任务,则需要进行一些技巧,例如使用可取消的包装器来包装您的任务


嗨,Theodor,谢谢你的回答!根据这些信息,我认为对于我的情况,最好的方案是在块定义本身中使用 new ExecutionDataflowBlockOptions() { CancellationToken = _downloadCts.Token },然后将相同的令牌传递到内部处理方法中以停止它正在执行的任何操作。我进行了测试,相同的令牌能够取消块,然后在内部方法中再次“触发”。我的问题是 - 它是否保证总是在内部方法中第二次“触发”? - niks
嗨@niks!是的,它将对当前处理的所有项目触发。当前处理的项目数量主要取决于传递到块构造函数中的设置MaxDegreeOfParallelism。默认值为1。 - Theodor Zoulias
我明白了。还有一件有趣的事情是,如果我使用Try库(https://github.com/StephenCleary/Try)并包装异常以便它们可以通过管道传递,如果`CancelationToken`在`TransformBlock`内部和外部都被触发,则即使在内部处理方法中抛出了`OperationCanceledException`,它也不会向下传递到最终块。不确定这是否是一个问题,但我认为在实现这样的解决方案时应该注意这一点。 - niks
在理论上,PropagateCompletion = true 可能会通过令牌抢先取消,导致某些块完成时错误地标记为成功而不是已取消。但在实践中,似乎不存在竞争条件。显然,在完成传播方面存在延迟,可能是故意的,也可能正是因为这个原因。 - Theodor Zoulias
明白了,你已经为我澄清了这个问题!再次感谢 - Efcharistó! :) - niks
显示剩余5条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接