TPL数据流转换块后跟批量块,然后是操作块

3
我有一个基于TPL Dataflow的应用程序,最初只使用批处理块和动作块就可以正常工作。
我添加了一个TransformBlock来尝试在将数据发送到批处理块之前转换源数据,但我的动作块从未被触发。没有任何错误或异常被抛出。
我不确定是否需要完成我的TransformBlock,因为它似乎只被触发了一次。
除了返回输出类型的对象之外,我是否需要添加其他步骤到我的转换代码中?
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        public const int BATCH_SIZE = 10;
        static void Main(string[] args)
        {
            Console.WriteLine("Application started");

            //Create the pipeline of actions
            var transformBlock = new TransformBlock<string, string>(input => TransformString(input), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
            var batchBlock = new BatchBlock<string>(BATCH_SIZE);
            var uploadFilesToAzureBlock = new ActionBlock<IEnumerable<string>>(strings => OutputStrings(strings), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

            Console.WriteLine("Blocks created");

            //link the actions
            transformBlock.LinkTo(batchBlock);
            batchBlock.LinkTo(uploadFilesToAzureBlock);
            batchBlock.Completion.ContinueWith(obj => uploadFilesToAzureBlock.Complete());

            Console.WriteLine("Blocks linked");

            var testInputs = new List<string>
            {
                "Kyle",
                "Stephen",
                "Jon",
                "Conor",
                "Adrian",
                "Marty",
                "Richard",
                "Norbert",
                "Kerri",
                "Mark",
                "Declan",
                "Ray",
                "Paul",
                "Andrew",
                "Rachel",
                "David",
                "Darrell"
            };

            Console.WriteLine("Data created");

            var i = 0;
            foreach (var name in testInputs)
            {
                Console.WriteLine("Posting name {0}", i);
                transformBlock.Post(name);
                i++;
            }

            batchBlock.Complete();
            uploadFilesToAzureBlock.Completion.Wait();

            Console.WriteLine("Finishing");
            Console.ReadKey();
        }

        private static void OutputStrings(IEnumerable<string> strings)
        {
            Console.WriteLine("Beginning Batch...");

            foreach (var s in strings)
            {
                Console.WriteLine(s);
            }

            Console.WriteLine("Completing Batch...");
        }

        private static string TransformString(string input)
        {
            return input += " has been processed";
        }
    }
}

我已更新代码,提供完整可测试的控制台应用程序,该应用程序存在相同的问题,但不是片段。 - Mark McGookin
以上代码已发布所有名称,但没有任何输出。 - Mark McGookin
您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - Mark McGookin
1
在所有链接上使用PropagateCompletion,并完成转换块。那应该可以了。 - usr
1个回答

2
正如上面的“usr”所提到的,我没有传播块的完成。以下代码完美运行。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        public const int BATCH_SIZE = 10;
        static void Main(string[] args)
        {
            Console.WriteLine("Application started");

            //Create the pipeline of actions
            var transformBlock = new TransformBlock<string, string>(input => TransformString(input), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
            var batchBlock = new BatchBlock<string>(BATCH_SIZE);
            var outputStringsBlock = new ActionBlock<IEnumerable<string>>(strings => OutputStrings(strings), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

            Console.WriteLine("Blocks created");

            //link the actions
            transformBlock.LinkTo(batchBlock, new DataflowLinkOptions { PropagateCompletion = true });
            batchBlock.LinkTo(outputStringsBlock, new DataflowLinkOptions { PropagateCompletion = true });
            batchBlock.Completion.ContinueWith(obj => outputStringsBlock.Complete());

            Console.WriteLine("Blocks linked");

            var testInputs = new List<string>
            {
                "Kyle",
                "Stephen",
                "Jon",
                "Conor",
                "Adrian",
                "Marty",
                "Richard",
                "Norbert",
                "Kerri",
                "Mark",
                "Declan",
                "Ray",
                "Paul",
                "Andrew",
                "Rachel",
                "David",
                "Darrell"
            };

            Console.WriteLine("Data created");

            var i = 0;
            foreach (var name in testInputs)
            {
                Console.WriteLine("Posting name {0}", i);
                transformBlock.Post(name);
                i++;
            }

            transformBlock.Complete();
            outputStringsBlock.Completion.Wait();

            Console.WriteLine("Finishing");
            Console.ReadKey();
        }

        private static void OutputStrings(IEnumerable<string> strings)
        {
            Console.WriteLine("Beginning Batch...");
            Console.WriteLine("");

            foreach (var s in strings)
            {
                Console.WriteLine(s);
            }

            Console.WriteLine("");
            Console.WriteLine("Completing Batch...");
        }

        private static string TransformString(string input)
        {
            return input += " has been processed";
        }
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接