TPL数据流异步调度

5

TPL Dataflowasync Task的调度并不像我预期的那样工作。在下面的示例中,我期望ActionBlock在可用时立即处理来自TransformBlock的数据。但它在继续处理第三个结果之前等待第二个(延迟的)结果。我在这里误解了什么?是否有一些对处理顺序的要求?

public class TestDataFlow
{
    public System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch();

    public async Task Flow()
    {
        watch.Start();

        var plus10 = new TransformBlock<int, int>(async input =>
        {
            if (input == 2)
            {
                await Task.Delay(5000);
            }
            Console.WriteLine("Exiting plus10 for input {0} @ {1}", input, watch.Elapsed);
            return input + 10;
        },
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 4,
        });

        var printSolution = new ActionBlock<int>(input =>
        {
            Console.WriteLine("Solution: {0} @ {1}", input, watch.Elapsed.TotalMilliSeconds);
        },
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 4,
        });

        plus10.LinkTo(printSolution);

        List<int> inputs = new List<int> { 1, 2, 3 };
        foreach (var input in inputs)
        {
            await plus10.SendAsync(input);
        }
    }
}

输出:

Exiting plus10 for input 1 @ 115.8583
Exiting plus10 for input 3 @ 116.6973
Solution: 11 @ 126.0146
Exiting plus10 for input 2 @ 5124.4074
Solution: 12 @ 5124.9014
Solution: 13 @ 5126.4834

看一下我关于让网格忽略保持顺序的答案 - kseen
2个回答

5

TPL数据流保证输入和输出队列的顺序,无论有多少项并行处理。

“因为每个预定义的源数据流块类型都保证按接收顺序传播消息,所以必须从源块读取每个消息,然后源块才能处理下一个消息。”

来自Dataflow(任务并行库)

如果您希望在处理完成时立即将项目移动到下一个块,则应显式地转移它们,这将使您的TransformBlock变成一个ActionBlock

var printSolution = new ActionBlock<int>(input =>
{
    Console.WriteLine("Solution: {0} @ {1}", input, watch.Elapsed.TotalMilliSeconds);
},executionDataflowBlockOptions);

var plus10 = new ActionBlock<int>(async input =>
{
    if (input == 2)
    {
        await Task.Delay(5000);
    }
    Console.WriteLine("Exiting plus10 for input {0} @ {1}", input, watch.Elapsed);
    await printSolution.SendAsync(input + 10);
}, executionDataflowBlockOptions);

3

从至少 System.Threading.Tasks.Dataflow.4.6.0 版本开始,ExecutionDataflowBlockOptions 现在有一个属性 EnsureOrdered,可以设置为 false

更新方式:

Install-Package System.Threading.Tasks.Dataflow

代码:

var options = new ExecutionDataflowBlockOptions {
  EnsureOrdered = false
};
var transform = new TransformBlock<int, int>(i => Transform(i), options);

一些更多的例子:https://dev59.com/A5nga4cB1Zd3GeqPb67o#38865414 我认为很有趣的开发历史:https://github.com/dotnet/corefx/issues/536 https://github.com/dotnet/corefx/pull/5191

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接