我有一个TPL Dataflow管道,其中一个目标块链接到两个传播块,然后这两个块都链接到源块。所有块都使用PropagateCompletion = true 链接。第一个传播块链接到只接受偶数的过滤器,第二个传播块链接到接受所有剩余消息的过滤器。
在发布了最后一条消息后,我将第一个块设置为已完成。然而,似乎存在竞争条件。最终块似乎有时会处理所有值,但有时仅处理第一个传播块接受的值和第二个传播块接受的部分值。
我感觉存在竞争条件。但我不知道如何正确地指示最终源块,只有在链接到它的两个传播块转发其所有消息后,才认为一切都已完成。
以下是我的代码简化为一个简单的示例:
在发布了最后一条消息后,我将第一个块设置为已完成。然而,似乎存在竞争条件。最终块似乎有时会处理所有值,但有时仅处理第一个传播块接受的值和第二个传播块接受的部分值。
我感觉存在竞争条件。但我不知道如何正确地指示最终源块,只有在链接到它的两个传播块转发其所有消息后,才认为一切都已完成。
以下是我的代码简化为一个简单的示例:
internal static class Program
{
public static async Task Main(string[] args)
{
var linkOptions = new DataflowLinkOptions
{
PropagateCompletion = true
};
var bufferBlock = new BufferBlock<int>();
var fork1 = new TransformBlock<int, int>(n => n);
var fork2 = new TransformBlock<int, int>(n =>
{
Thread.Sleep(100);
return n;
});
var printBlock = new ActionBlock<int>(Console.WriteLine);
bufferBlock.LinkTo(fork1, linkOptions, n => n % 2 == 0);
bufferBlock.LinkTo(fork2, linkOptions, n => n % 2 != 0);
fork1.LinkTo(printBlock, linkOptions);
fork2.LinkTo(printBlock, linkOptions);
for (var n = 1; n <= 10; ++n)
{
bufferBlock.Post(n);
}
bufferBlock.Complete();
await printBlock.Completion;
}
}
这将输出:
2
4
6
8
10
而我希望它输出:
2
4
6
8
10
1
3
5
7
9
Task.WhenAll()
的结果分配给一个任务,以便在方法末尾可以await
,以确保观察到任何异常。 - Matthew WatsonContinueWith
任务的引用没有太多价值。如果该任务失败,则永远不会到达方法的末尾,因为printBlock.Completion
永远不会完成。这就是为什么我更喜欢在async void
方法内部处理这些关键完成传播,因为在(极不可能的)失败情况下,我更喜欢我的应用程序崩溃而不是挂起。 - Theodor ZouliasContinueWith
的用法违反了这个准则:不要在没有传递TaskScheduler的情况下创建任务。 - Theodor Zoulias