如何在分叉的TPL Dataflow管道中正确等待完成?

3
我有一个TPL Dataflow管道,其中一个目标块链接到两个传播块,然后这两个块都链接到源块。所有块都使用PropagateCompletion = true 链接。第一个传播块链接到只接受偶数的过滤器,第二个传播块链接到接受所有剩余消息的过滤器。
在发布了最后一条消息后,我将第一个块设置为已完成。然而,似乎存在竞争条件。最终块似乎有时会处理所有值,但有时仅处理第一个传播块接受的值和第二个传播块接受的部分值。
我感觉存在竞争条件。但我不知道如何正确地指示最终源块,只有在链接到它的两个传播块转发其所有消息后,才认为一切都已完成。
以下是我的代码简化为一个简单的示例:
    internal static class Program
    {
        public static async Task Main(string[] args)
        {
            var linkOptions = new DataflowLinkOptions
            {
                PropagateCompletion = true
            };
            var bufferBlock = new BufferBlock<int>();
            var fork1 = new TransformBlock<int, int>(n => n);
            var fork2 = new TransformBlock<int, int>(n =>
            {
                Thread.Sleep(100);
                return n;
            });
            var printBlock = new ActionBlock<int>(Console.WriteLine);

            bufferBlock.LinkTo(fork1, linkOptions, n => n % 2 == 0);
            bufferBlock.LinkTo(fork2, linkOptions, n => n % 2 != 0);
            
            fork1.LinkTo(printBlock, linkOptions);
            fork2.LinkTo(printBlock, linkOptions);
            
            for (var n = 1; n <= 10; ++n)
            {
                bufferBlock.Post(n);
            }
            bufferBlock.Complete();

            await printBlock.Completion;
        }
    }

这将输出:

2
4
6
8
10

而我希望它输出:

2
4
6
8
10
1
3
5
7
9
3个回答

2

您的数据流图中有一颗钻石,它会通过两个分支之一加速完成传播,从而导致最终块过早地完成。

可以使用任务继续来自定义最后一个块的完成:

          ...
            var printBlock = new ActionBlock<int>(Console.WriteLine);

            bufferBlock.LinkTo(fork1, linkOptions, n => n % 2 == 0);
            bufferBlock.LinkTo(fork2, linkOptions, n => n % 2 != 0);
            
            fork1.LinkTo(printBlock); // no completion propagation
            fork2.LinkTo(printBlock);
           
            Task.WhenAll(fork1.Completion, fork2.Completion)
               .ContinueWith(t => printBlock.Complete(), 
                   CancellationToken.None, 
                   TaskContinuationOptions.ExecuteSynchronously, 
                   TaskScheduler.Default);

            for (var n = 1; n <= 10; ++n)
            {
                bufferBlock.Post(n);
            }

            bufferBlock.Complete();

            await printBlock.Completion;


谢谢!这正是我需要的。我已经在一个Gist中找到了它,并将该Gist链接到了一个类似但不同措辞的问题。尽管如此,我仍会将您的答案标记为解决方案。 - Tohnmeister
1
一个小细节:你应该将 Task.WhenAll() 的结果分配给一个任务,以便在方法末尾可以await,以确保观察到任何异常。 - Matthew Watson
@MatthewWatson 在这种特定情况下,保留对ContinueWith任务的引用没有太多价值。如果该任务失败,则永远不会到达方法的末尾,因为printBlock.Completion永远不会完成。这就是为什么我更喜欢在async void方法内部处理这些关键完成传播,因为在(极不可能的)失败情况下,我更喜欢我的应用程序崩溃而不是挂起。 - Theodor Zoulias
alexm,这个ContinueWith的用法违反了这个准则:不要在没有传递TaskScheduler的情况下创建任务 - Theodor Zoulias

0
PropagateCompletion更改为false即可解决问题。
var linkOptions = new DataflowLinkOptions
{
    PropagateCompletion = false
};

这样,分支1的完成不会停止打印块,您可以在最后手动完成它。


0

我来回答这个问题,因为我在另一个问题这里找到了答案。

问题在于fork1和fork2都与printBlock链接,并且使用PropagateCompletion = true。即使fork1跳过了一些消息,它仍然将完成状态传播到printBlock,导致printBlock在fork2处理任何或所有消息之前就已经完成了。

解决方案是替换

fork1.LinkTo(printBlock, linkOptions);
fork2.LinkTo(printBlock, linkOptions);

使用

fork1.LinkTo(printBlock);
fork2.LinkTo(printBlock);

Task.WhenAll(fork1.Completion, fork2.Completion).ContinueWith(_ => printBlock.Complete());

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接