TPL DataFlow 工作流程

3
我刚开始阅读TPL Dataflow,对此感到困惑。虽然我已经阅读了很多与此相关的文章,但我仍然无法轻松理解。也许这很难,也许我还没有开始掌握这个想法。
我开始研究这个问题的原因是,我想实现一个可以按顺序运行并行任务的场景,并发现TPL Dataflow可以用于此目的。
我同时练习TPL和TPL Dataflow,并处于初学者水平,因此需要专家帮助指导正确方向。在我的测试方法中,我已经完成了以下事情:
private void btnTPLDataFlow_Click(object sender, EventArgs e)
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();

        txtOutput.Clear();

        ExecutionDataflowBlockOptions execOptions = new ExecutionDataflowBlockOptions();
        execOptions.MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded;

        ActionBlock<string> actionBlock = new ActionBlock<string>(async v =>
        {
            await Task.Delay(200);
            await Task.Factory.StartNew(

                () => txtOutput.Text += v + Environment.NewLine, 
                CancellationToken.None,
                TaskCreationOptions.None,
                scheduler
                );

        }, execOptions);

        for (int i = 1; i < 101; i++)
        {
            actionBlock.Post(i.ToString());
        }

        actionBlock.Complete();

        watch.Stop();
        lblTPLDataFlow.Text = Convert.ToString(watch.ElapsedMilliseconds / 1000);
    }

现在,该过程是并行的和异步的(不会冻结我的用户界面),但生成的输出顺序不正确,而我已经读到TPL Dataflow默认保持元素的顺序。所以我的猜测是,我创建的任务是罪魁祸首,它没有按正确的顺序输出字符串。我对吗?
如果是这种情况,那么我怎样才能使这个过程既异步又有序?
我尝试将代码分离并将代码分配到不同的方法中,但我的尝试失败了,因为只有字符串被输出到文本框,什么也没有发生。
 private async void btnTPLDataFlow_Click(object sender, EventArgs e)
    {
        Stopwatch watch = new Stopwatch();
        watch.Start();

        await TPLDataFlowOperation();

        watch.Stop();
        lblTPLDataFlow.Text = Convert.ToString(watch.ElapsedMilliseconds / 1000);
    }

    public async Task TPLDataFlowOperation()
    {
        var actionBlock = new ActionBlock<int>(async values => txtOutput.Text += await ProcessValues(values) + Environment.NewLine,
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded, TaskScheduler = scheduler });

        for (int i = 1; i < 101; i++)
        {
            actionBlock.Post(i);
        }

        actionBlock.Complete();
        await actionBlock.Completion;
    }

    private async Task<string> ProcessValues(int i)
    {
        await Task.Delay(200);
        return "Test " + i;
    }

我知道我写的代码不好,但这是我第一次尝试TPL Dataflow。


你为什么一开始就使用数据流呢?据我所知,你没有使用它的任何功能。看起来异步方法在这里完全可以胜任。 - usr
1个回答

1
我该如何使这个异步且有序?
这有点矛盾。你可以让并发任务按顺序开始,但你无法保证它们会按顺序运行或完成。
让我们检查一下你的代码并看看发生了什么。
首先,你选择了DataflowBlockOptions.Unbounded。这告诉TPL Dataflow它不应限制允许并发运行的任务数。因此,你的每个任务将在大致相同的时间内按顺序开始。
你的异步操作始于await Task.Delay(200)。这将导致你的方法被挂起,然后在大约200毫秒后恢复。但是,这种延迟不是精确的,并且会因每次调用而异。此外,在延迟之后恢复你的代码的机制可能需要不同的时间。由于实际延迟的随机变化,下一个要运行的代码现在< strong>不是按顺序 - 这就是你看到的差异。
你可能会发现这个例子很有趣。这是一个控制台应用程序,以简化事情。
class Program
{
    static void Main(string[] args)
    {
        OutputNumbersWithDataflow();
        OutputNumbersWithParallelLinq();

        Console.ReadLine();
    }

    private static async Task HandleStringAsync(string s)
    {
        await Task.Delay(200);
        Console.WriteLine("Handled {0}.", s);
    }

    private static void OutputNumbersWithDataflow()
    {
        var block = new ActionBlock<string>(
            HandleStringAsync,
            new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

        for (int i = 0; i < 20; i++)
        {
            block.Post(i.ToString());
        }

        block.Complete();

        block.Completion.Wait();
    }

    private static string HandleString(string s)
    {
        // Perform some computation on s...
        Thread.Sleep(200);

        return s;
    }

    private static void OutputNumbersWithParallelLinq()
    {
        var myNumbers = Enumerable.Range(0, 20).AsParallel()
                                               .AsOrdered()
                                               .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
                                               .WithMergeOptions(ParallelMergeOptions.NotBuffered);

        var processed = from i in myNumbers
                        select HandleString(i.ToString());

        foreach (var s in processed)
        {
            Console.WriteLine(s);
        }
    }
}

第一组数字是使用类似于您的TPL Dataflow方法计算出来的。这些数字是无序的。
第二组数字由OutputNumbersWithParallelLinq()输出,根本不使用Dataflow。它依赖于内置在.NET中的Parallel LINQ功能。这会在后台线程上运行我的HandleString()方法,但保持数据有序直到结束。
这里的限制是PLINQ不允许您提供异步方法。(好吧,您可以,但它不会给您所需的行为。)HandleString()是一个传统的同步方法;它只是在后台线程上执行。
这里还有一个更复杂的Dataflow示例,可以保持正确的顺序:
private static void OutputNumbersWithDataflowTransformBlock()
{
    Random r = new Random();
    var transformBlock = new TransformBlock<string, string>(
        async s =>
        {
            // Make the delay extra random, just to be sure.
            await Task.Delay(160 + r.Next(80));
            return s;
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

    // For a GUI application you should also set the
    // scheduler here to make sure the output happens
    // on the correct thread.
    var outputBlock = new ActionBlock<string>(
        s => Console.WriteLine("Handled {0}.", s),
        new ExecutionDataflowBlockOptions
            { 
                SingleProducerConstrained = true,
                MaxDegreeOfParallelism = 1
            });

    transformBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true });

    for (int i = 0; i < 20; i++)
    {
        transformBlock.Post(i.ToString());
    }

    transformBlock.Complete();

    outputBlock.Completion.Wait();
}

非常感谢您的详细解释。虽然我不能说我完全理解了所有内容,但现在我对DataFlow有了更深刻的认识。但我仍然不明白为什么DataFlow不能保持顺序。我无法理解的主要原因是这个链接:https://dev59.com/CHzaa4cB1Zd3GeqPKwov#21827118。 - Adnan Yaseen
关键在于每个步骤都要按正确的顺序开始。数据流无法控制代码内部发生的事情。想象一下自行车比赛有几个阶段。自行车手按顺序出发,但到了阶段结束时,顺序已经改变。然后他们再次按顺序开始(根据我们的想象规则)进行下一个阶段。 - Olly
谢谢,我现在明白了。我之前发布了另一个关于DataFlow的问题,但是针对另一种情况,得到的答案是输入块中的数据没有顺序,但输出时它们是有序的。我发现我需要使用TransformBlock来实现这个目的。 - Adnan Yaseen

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接