TPL数据流块使用LinkTo谓词

5

我有一些块,最终从TransformBlock转换为基于LinkTo谓词的三个其他变换块之一。我使用DataflowLinkOptions来传播完成状态。问题是当谓词满足并且该块启动时,我的管道的其余部分继续执行。看起来管道应该先等待此块完成。

这段代码大致如下:

var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
mainBlock.LinkTo(block1, linkOptions, x => x.Status = Status.Complete);
mainBlock.LinkTo(block2, linkOptions, x => x.Status = Status.Cancelled);
mainBlock.LinkTo(block3, linkOptions, x => x.Status = Status.Delayed);
mainBlock.LinkTo(DataflowBlock.NullTarget<Thing>(), linkOptions);

现在,这个方法没有按照我的预期工作,因此我发现得到我想要的行为的唯一方法是将 linkOptions 拿出来,并将以下内容添加到 mainBlock 的 Lambda 中。

mainBlock = new TransformBlock<Thing,Thing>(input =>
{
    DoMyStuff(input);

    if (input.Status = Status.Complete)
    {
        mainBlock.Completion.ContinueWith(t => block1.Complete());
    }
    if (input.Status = Status.Cancelled)
    {
        mainBlock.Completion.ContinueWith(t => block2.Complete());
    }
    if (input.Status = Status.Delayed)
    {
        mainBlock.Completion.ContinueWith(t => block3.Complete());
    }

    return input;
});

那么问题来了,这是唯一使它工作的方法吗?

顺便说一句,我已经通过单个数据项在我的单元测试中运行它以尝试调试管道行为。每个块已经用多个单元测试分别进行了测试。在我的管道单元测试中,断言在块完成执行之前就被击中了,因此失败了。

如果我删除块2和块3的链接,并使用linkOptions调试测试,它可以正常工作。


4
你能否发布能够重现问题的真实代码?你的"类似这样的"代码甚至无法编译。 - Cory Nelson
TPL Dataflow 用于基于 actor 的编程。看起来你正在尝试使用它,而不是只写一个带有 if 语句的异步方法。 - i3arnon
这只是一个出问题的流水线的一部分。但是,在尝试提供完整的测试示例之后,我发现我遇到的问题更多地与在我的块中使用异步委托有关。在我做更多调查之后,我会发布更多内容。 - jmichas
2个回答

9
你的问题与你提问中的代码没有关系,它是正确的:当主块完成时,所有三个后续块也会被标记为完成。
问题出在结尾块: 你在那里也使用了PropagateCompletion,这意味着当任何一个前面的三个块完成时,结尾块就会被标记为完成。你想要的是当所有三个块都完成时才将其标记为完成状态,而你的回答中Task.WhenAll().ContinueWith()的组合能做到这一点(虽然这段代码的第一部分是不必要的,但它和PropagateCompletion完全相同)。
正如你猜测的那样,链接选项传播(至少我猜是这样)将传播不满足linkTo中谓词的块的完成。
是的,它总是传播完成。完成没有与之相关联的任何项,因此将谓词应用于它没有任何意义。也许你总是只有单个项目(这不常见)使你更加困惑?
如果我的猜测是正确的,那么我觉得这是链接选项完成传播中的错误或设计错误。如果从未使用块,则为什么应该将其标记为完成?
为什么不应该?对我来说,这很合理:即使这次没有任何具有Status.Delayed的项目,你仍然希望完成处理这些项目的块,以便任何后续代码都知道所有延迟的项目已经被处理了。事实上它们不存在并不重要。
无论如何,如果你经常遇到这种情况,你可能想创建一个帮助方法,同时将多个源块链接到单个目标块,并正确地传播完成。
public static void LinkTo<T>(
    this IReadOnlyCollection<ISourceBlock<T>> sources, ITargetBlock<T> target,
    bool propagateCompletion)
{
    foreach (var source in sources)
    {
        source.LinkTo(target);
    }

    if (propagateCompletion)
        Task.WhenAll(sources.Select(source => source.Completion))
            .ContinueWith(_ => target.Complete());
}

使用方法:

new[] { block1, block2, block3 }.LinkTo(endBlock, propagateCompletion: true);

谢谢你澄清了这一点。我想你的解释让下游块完成即使它们从未有任何数据流通过也是有道理的。我猜作为一个新手,我把“管道”工作方式想象得与实际不同,只有数据项经过的路径才会“完成”。 - jmichas
在长时间的搜索后来到这里。我觉得Dataflow文档在完成传播方面有些欠缺。感谢您提供的答案。 - Tohnmeister

0

好的。首先我要感谢Cory。当我第一次看到他的评论时,我有点生气,因为我觉得我的代码已经很好地阐述了概念,并且可以很容易地转化为可工作的版本。但无论如何,由于他的评论,我感到需要做一个完整的可测试版本来发布。

在我的测试中,令人惊讶的是,即使它模仿了我的真实代码,我认为会失败的路径却通过了,而我认为会通过的路径却失败了。这让我有点头晕。所以我开始对原始代码进行更多的排列组合。基本上,我创建了同步块和异步块,并制作了两种类型的管道。总共四个,2个同步和2个异步,每个都使用链接选项进行传播,另一个则使用主块中的完成任务。

在将一些任务延迟添加到异步任务后,我发现同步版本通过了测试,而异步版本失败了。

因此,最终解决问题的方法有点不同。事实证明,链接选项传播(至少我猜是这样)将传播不满足linkTo中谓词的块的完成情况。因此,当具有“完成”状态的Thing下降时,它进入Block1。

哦,我应该指出在我制作的完整测试代码中,所有的1、2和3块都连接到同一个EndBlock,这在原始代码中没有显示。

无论如何,在谓词满足并且Thing进入Block1之后,我相信Block2和Block3被设置为完成。这导致了我们在单元测试中等待的EndBlock完成,而Assert失败是因为Block1还没有完成它的工作。

如果我的猜测是正确的,我有点觉得这是链接选项完成传播中的错误或设计错误。为什么一个从未使用过的块会被标记为完成呢?

所以,这就是我解决问题的方法。我去掉了链接选项,并手动连接了完成事件。像这样:

MainBlock.Completion.ContinueWith(t =>
{
Block1.Complete();
Block2.Complete();
Block3.Complete();
});

Task.WhenAll(Block1.Completion, Block2.Completion, Block3.Completion)
.ContinueWith(t =>
{
    EndBlock.Complete();
});

这个方法很好用,在我的实际代码中也运行良好。 Task.WhenAll 让我相信未使用的块被设置为完整,这就是自动传播成为问题的原因。

希望这能帮助到有需要的人。当我发布所有的测试代码时,我会回来添加一个链接。

编辑: 这里是测试代码的 gist 链接 https://gist.github.com/jmichas/bfab9cec84f0d1e40e12


我将其标记为答案,因为它就是答案,但svicks的解释解决了我的一些问题,并提供了一种更清晰的方法,通过结合传播完成和任务当所有内容。 - jmichas

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接