TPL Dataflow,只有在所有源数据块完成时才能保证完成。

34

当两个transformblocks都完成时,我如何重写代码才能使代码完成呢? 我认为完成意味着标记已完成并且“ out队列”为空?

public Test()
    {
        broadCastBlock = new BroadcastBlock<int>(i =>
            {
                return i;
            });

        transformBlock1 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("1 input count: " + transformBlock1.InputCount);
                Thread.Sleep(50);
                return ("1_" + i);
            });

        transformBlock2 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("2 input count: " + transformBlock1.InputCount);
                Thread.Sleep(20);
                return ("2_" + i);
            });

        processorBlock = new ActionBlock<string>(i =>
            {
                Console.WriteLine(i);
            });

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        const int numElements = 100;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.SendAsync(i);
        }

        //mark completion
        broadCastBlock.Complete();

        processorBlock.Completion.Wait();

        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}

我编辑了代码,在每个变换块中添加了一个输入缓冲计数。显然,所有100个项目都被流式传输到每个变换块中。但是,一旦其中一个变换块完成,处理器块就不再接受任何更多的项目,而是将不完整的变换块的输入缓冲区刷新。


请注意,转换块可能无法接收广播块的所有消息。它们只会接收到最新的消息。如果广播块提供的消息比转换块能够接收的快,转换块将会错过一些消息。此外,如果您想确保消息顺序等问题,请在 SendAsync(i) 上使用 await - urbanhusky
@urbanhusky,我不确定你所说的TransformBlock会错过消息。文档说明BroadcastBlock保证在接受新项之前将其传播到所有链接的目标。如果目标上的BoundedCapacity未绑定,则目标TransformBlock将缓冲消息。现在,如果设置了BoundedCapacity,则它将阻塞直到有容量,并且这可能会导致丢失消息,特别是使用未等待的SendAsync。简而言之,我的理解是如果无限制,则不会丢失任何消息。 - Kevin Fichter
5个回答

38
问题正是casperOne在他的回答中所说的。一旦第一个转换块完成,处理器块就进入“完成模式”:它将处理其输入队列中剩余的项目,但不会接受任何新项目。
然而,有一个比分裂处理器块更简单的解决方法:不要设置PropagateCompletion,而是在两个转换块完成时手动设置处理器块的完成。
Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion)
    .ContinueWith(_ => processorBlock.Complete());

正是我所寻找的。我没有意识到Task.WhenAll会返回可等待任务,这是我的疏忽。 - Matt
我需要完全相同的内容,也许现在已经太晚了,但您能否发布有关我需要在哪里添加Task.WhenAll结构的更新? - Attila Hajdrik
@AttilaHajdrik 可能在您的数据流设置代码末尾,靠近您的 LinkTo - svick
我尝试过那个方法,但是没有起作用,不过当时已经是凌晨1:30了...虽然我将代码修改为普通TPL并分离了一些任务,我认为我成功地解决了问题,而无需使用DataFlow。 - Attila Hajdrik

32
这里的问题在于每次你调用 LinkTo 方法 来链接块并在转换块中等待不同的时间时,都会设置 PropagateCompletion 属性

IDataflowBlock 接口Complete 方法 文档(强调我的)中可以看到:

向 IDataflowBlock 发出信号,告知其不应再接受不应再生成任何消息,也不应再消耗任何后期消息。

因为在每个 TransformBlock<TInput, TOutput> 实例中你交错了等待时间,所以 transformBlock2(等待 20 毫秒)比 transformBlock1(等待 50 毫秒)先完成。 transformBlock2 先完成,然后发送信号到 processorBlock,然后说“我不接受其他任何东西”(而 transformBlock1 还没有产生所有的消息)。
请注意,处理 transformBlock1transformBlock1 之前的顺序不是绝对保证的;线程池(假设您正在使用默认调度程序)可能会以不同的顺序处理任务(但很可能不会,因为一旦 20 毫秒的项目完成,它就会从队列中窃取工作)。
你的管道看起来像这样:
           broadcastBlock
          /              \
 transformBlock1   transformBlock2
          \              /
           processorBlock

为了解决这个问题,您需要一个类似于这样的流水线:
           broadcastBlock
          /              \
 transformBlock1   transformBlock2
          |              |
 processorBlock1   processorBlock2

只需创建两个单独的ActionBlock<TInput>实例即可实现此目的,如下所示:

// The action, can be a method, makes it easier to share.
Action<string> a = i => Console.WriteLine(i);

// Create the processor blocks.
processorBlock1 = new ActionBlock<string>(a);
processorBlock2 = new ActionBlock<string>(a);


// Linking
broadCastBlock.LinkTo(transformBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true });

然后,您需要等待两个处理器块,而不是仅等待一个:

Task.WhenAll(processorBlock1.Completion, processorBlock2.Completion).Wait();

这里有一个非常重要的提示; 创建 ActionBlock<TInput> 时,默认值是将传递给它的 ExecutionDataflowBlockOptions 实例上的 MaxDegreeOfParallelism 属性 设置为1。

这意味着您传递给 ActionBlock<TInput>Action<T> 委托 调用是线程安全的,每次只执行一个。

因为现在您有了两个指向同一 Action<T> 委托的 ActionBlock<TInput> 实例,所以不能保证线程安全性。

如果你的方法是线程安全的,那么你不需要做任何事情(这将允许你将MaxDegreeOfParallelism属性设置为DataflowBlockOptions.Unbounded,因为没有理由阻塞)。
如果它不是线程安全的,并且你需要保证它,你需要使用传统的同步原语,例如lock语句
在这种情况下,你可以像这样做(尽管显然不需要这样做,因为Console上的WriteLine方法是线程安全的):
// The lock.
var l = new object();

// The action, can be a method, makes it easier to share.
Action<string> a = i => {
    // Ensure one call at a time.
    lock (l) Console.WriteLine(i);
};

// And so on...

1
感谢您提供的详细答案,但我选择了svick的答案,因为它直接适用于TPL Dataflow并提供了非常简洁和简单的解决方案。 - Matt
6
如果你在两个动作块中使用相同的 ExclusiveScheduler,就可以轻松避免锁定。 - svick

9

对svick的回答进行补充:为了与使用PropagateCompletion选项时获得的行为一致,如果前面的块故障,您还需要转发异常。以下类似于扩展方法也会处理这个问题:

public static void CompleteWhenAll(this IDataflowBlock target, params IDataflowBlock[] sources) {
    if (target == null) return;
    if (sources.Length == 0) { target.Complete(); return; }
    Task.Factory.ContinueWhenAll(
        sources.Select(b => b.Completion).ToArray(),
        tasks => {
            var exceptions = (from t in tasks where t.IsFaulted select t.Exception).ToList();
            if (exceptions.Count != 0) {
                target.Fault(new AggregateException(exceptions));
            } else {
                target.Complete();
            }
        }
    );
}

2
这里有一种方法,它在功能上与pkt的 CompleteWhenAll方法相当,但代码稍微少一些:
public static void PropagateCompletion(IDataflowBlock[] sources,
    IDataflowBlock target)
{
    // Arguments validation omitted
    Task allSourcesCompletion = Task.WhenAll(sources.Select(s => s.Completion));
    ThreadPool.QueueUserWorkItem(async _ =>
    {
        try { await allSourcesCompletion.ConfigureAwait(false); } catch { }

        Exception exception = allSourcesCompletion.IsFaulted ?
            allSourcesCompletion.Exception : null;

        if (exception is null) target.Complete(); else target.Fault(exception);
    });
}

使用示例:
PropagateCompletion(new[] { transformBlock1, transformBlock2 }, processorBlock);

“PropagateCompletion”方法是同名更通用方法的一种变体,我在这里发布了该方法。

1

其他答案已经很清楚地解释了为什么当块有超过两个源时,PropagateCompletion=true会使事情变得混乱。

为了提供一个简单的解决方案,您可以查看一个名为DataflowEx的开源库,它内置了更智能的完成规则来解决这种问题。(它在内部使用TPL Dataflow链接,但支持复杂的完成传播。实现类似于WhenAll,但还处理动态链接添加。请查看Dataflow.RegisterDependency()TaskEx.AwaitableWhenAll()以获取详细实现信息。)

我稍微修改了您的代码,以便使用DataflowEx使一切正常:

public CompletionDemo1()
{
    broadCaster = new BroadcastBlock<int>(
        i =>
            {
                return i;
            }).ToDataflow();

    transformBlock1 = new TransformBlock<int, string>(
        i =>
            {
                Console.WriteLine("1 input count: " + transformBlock1.InputCount);
                Thread.Sleep(50);
                return ("1_" + i);
            });

    transformBlock2 = new TransformBlock<int, string>(
        i =>
            {
                Console.WriteLine("2 input count: " + transformBlock2.InputCount);
                Thread.Sleep(20);
                return ("2_" + i);
            });

    processor = new ActionBlock<string>(
        i =>
            {
                Console.WriteLine(i);
            }).ToDataflow();

    /** rather than TPL linking
      broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
      broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
      transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
      transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
     **/

    //Use DataflowEx linking
    var transform1 = transformBlock1.ToDataflow();
    var transform2 = transformBlock2.ToDataflow();

    broadCaster.LinkTo(transform1);
    broadCaster.LinkTo(transform2);
    transform1.LinkTo(processor);
    transform2.LinkTo(processor);
}

完整的代码在这里

免责声明:我是DataflowEx的作者,其发布在MIT许可下。


请问一下,您是否在Gridsum工作?我的问题明确提到了我需要TPL Dataflow的答案,我不想使用第三方解决方案来解决这个问题。谢谢。 - Matt
3
是的,我在 Gridsum 工作。但这个库是完全免费和开源的,所以我认为它可能对你有所帮助。完全没有商业想法。如果你需要关于 TPL Dataflow 的内部机制,请忽略我的回答。但如果有人需要一个“解决方案”,这个回答就有其价值。谢谢 :) - Dodd
更新答案,添加了更多细节。免责声明也已添加。 - Dodd

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接