我如何拆分和合并这个数据流管道?

4

我正在尝试使用Tpl创建一个具有以下形式的数据流:

                    -> LoadDataBlock1 -> ProcessDataBlock1 ->  
GetInputPathsBlock  -> LoadDataBlock2 -> ProcessDataBlock2 -> MergeDataBlock -> SaveDataBlock
                    -> LoadDataBlock3 -> ProcessDataBlock3 ->
                    ...                             
                    -> LoadDataBlockN -> ProcessDataBlockN ->

这个想法是,GetInputPathsBlock是一个块,它找到要加载的输入数据路径,然后将每个路径发送到每个LoadDataBlock。所有的LoadDataBlocks都是相同的(除了它们从GetInputPaths接收到一个唯一的inputPath字符串)。加载的数据然后被发送到ProcessDataBlock,进行一些简单的处理。然后来自每个ProcessDataBlock的数据被发送到MergeDataBlock,合并它并发送到SaveDataBlock,然后保存到文件中。
把它想象成一个需要每个月运行的数据流。首先找到每天数据的路径,加载和处理每天的数据,然后为整个月份合并和保存。每个月都可以并行运行,每个月中每天的数据可以并行加载和处理(在加载完单独的每日数据之后),一旦每月的所有数据都被加载和处理完毕,就可以合并和保存。
根据我的理解,可以使用TransformManyBlock<TInput,string>来执行拆分(GetInputPathsBlock),并将其链接到普通的TransformBlock<string,InputData>LoadDataBlock),然后连接到另一个TransformBlock<InputData,ProcessedData>ProcessDataBlock),但是我不知道如何将其合并回单个块。
我找到了这个答案,它使用TransformManyBlockIEnumerable<item>转换为item,但我不完全理解它,并且我无法将TransformBlock<InputData,ProcessedData>ProcessDataBlock)连接到TransformBlock<IEnumerable<ProcessedData>>,ProcessedData>,所以我不知道如何使用它。
我还看到了这样的答案,它建议使用JoinBlock,但输入文件数N变化,而且所有文件都以相同方式加载。
还有一个答案,似乎可以做到我想要的,但我不完全理解它,并且不知道如何将字典设置转移到我的情况中。
如何拆分和合并数据流?
  • 我是否遗漏了某个块类型?
  • 我能否以某种方式两次使用TransformManyBlock
  • 对于拆分/合并,tpl是否有意义,还是有更简单的异步/等待方式?

那么每个 LoadDataBlock1LoadDataBlock2 等等只会接收一个要处理的项目?每个块都只有一个 inputPath 吗? - Theodor Zoulias
没错。每个输入路径都会被发送到它自己的LoadDataBlock。 - Little geek
我会质疑是否需要多个LoadDataBlock(一个看起来就足够了),但关于你问题的本质,特别是合并部分,你尝试过将所有的ProcessDataBlock简单地LinkTo到单个的MergeDataBlock吗? - Theodor Zoulias
我认为我们之间存在误解。当我说每个输入路径都将被发送到其自己的LoadDataBlock时,我的意思是该块只定义一次,但应与每个输入路径一起使用。行var LoadDataBlock = Transform....仅会出现一次(同样适用于ProcessDataBlock),但它应该针对每个输入路径单独运行。这有意义吗? - Little geek
2个回答

3

我会使用嵌套的块来避免分割我的月度数据,然后再进行合并。这是两个嵌套的TransformBlock的例子,它们处理整个2020年的所有日子:

var monthlyBlock = new TransformBlock<int, List<string>>(async (month) =>
{
    var dailyBlock = new TransformBlock<int, string>(async (day) =>
    {
        await Task.Delay(100); // Simulate async work
        return day.ToString();
    }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 4 });

    foreach (var day in Enumerable.Range(1, DateTime.DaysInMonth(2020, month)))
        await dailyBlock.SendAsync(day);
    dailyBlock.Complete();

    var dailyResults = await dailyBlock.ToListAsync();
    return dailyResults;
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });

foreach (var month in Enumerable.Range(1, 12))
    await monthlyBlock.SendAsync(month);
monthlyBlock.Complete();

我使用了下面的扩展方法ToListAsync来收集内部块的每日结果:

public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> block,
    CancellationToken cancellationToken = default)
{
    var list = new List<T>();
    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (block.TryReceive(out var item))
        {
            list.Add(item);
        }
    }
    await block.Completion.ConfigureAwait(false); // Propagate possible exception
    return list;
}

谢谢,我认为这就是我正在寻找的东西,我会试一下。你使用MaxDegreeOfParallelism = 1来处理monthlyBlock有特别的原因吗? - Little geek
避免过度并行化是出于以下原因。例如,如果每个日常工作都需要从文件夹中读取一个大文件,则从同一物理磁盘驱动器并发读取4个文件可能会使进程变慢而不是更快,因为磁盘的头不能同时在4个位置上。如果驱动器是固态硬盘,则可以进行更多的并行化。另一方面,如果每个工作主要是计算,则理想的并行度是机器的核心数。这非常取决于您正在做什么以及最常使用的硬件的能力。 - Theodor Zoulias
总的并行度是内部和外部代码块的“最大并行度”乘积。因此,如果外部为4,内部为4,则总数为16。 - Theodor Zoulias
@Littlegeek,我在类似问题的帖子中发布了一个更复杂的答案,链接在这里:https://dev59.com/018d5IYBdhLWcg3wsDtW#65085413。 - Theodor Zoulias

0
你的问题的答案是:不,你不需要另一种块类型;是的,你可以两次使用 TransformManyBlock;它是有意义的。我写了一些代码来证明它,并放在底部,还有一些关于它如何工作的注释。
代码使用分裂然后合并的管道,就像你描述的那样。至于你遇到的问题:将单个文件的数据合并在一起可以通过在可用时将处理过的项添加到列表中来完成。只有当列表具有预期的最终条目数时,我们才将其传递给下一个块。这可以使用相当简单的 TransformMany 块返回零或一个项来完成。该块不能并行化,因为该列表不是线程安全的。
一旦你有了这样的管道,你就可以通过仅使用传递给块的选项来测试并行化和排序。下面的代码将并行化设置为每个块都是无界的,并让 DataFlow 代码解决问题。在我的机器上,它利用了所有的核心/逻辑处理器,是 CPU 绑定的,这正是我们想要的。启用了排序,但关闭它并没有太大的区别:我们仍然是 CPU 绑定的。
最后我必须说这是一个非常酷的技术,但实际上你可以使用PLINQ更简单地解决这个问题,只需几行代码就能得到同样快的结果。最大的缺点是如果你这样做,你不能轻松地逐步添加快速到达的消息到管道中:PLINQ更适合于一个大批处理过程。然而,对于您的用例,PLINQ可能是更好的解决方案。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace ParallelDataFlow
{
    class Program
    {
        static void Main(string[] args)
        {
            new Program().Run();
            Console.ReadLine();
        }

        private void Run()
        {
            Stopwatch s = new Stopwatch();
            s.Start();

            // Can  experiment with parallelization of blocks by changing MaxDegreeOfParallelism
            var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded };
            var getInputPathsBlock = new TransformManyBlock<(int, int), WorkItem>(date => GetWorkItemWithInputPath(date), options);
            var loadDataBlock = new TransformBlock<WorkItem, WorkItem>(workItem => LoadDataIntoWorkItem(workItem), options);
            var processDataBlock = new TransformBlock<WorkItem, WorkItem>(workItem => ProcessDataForWorkItem(workItem), options);
            var waitForProcessedDataBlock = new TransformManyBlock<WorkItem, List<WorkItem>>(workItem => WaitForWorkItems(workItem));  // Can't parallelize this block
            var mergeDataBlock = new TransformBlock<List<WorkItem>, List<WorkItem>>(list => MergeWorkItemData(list), options);
            var saveDataBlock = new ActionBlock<List<WorkItem>>(list => SaveWorkItemData(list), options);

            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
            getInputPathsBlock.LinkTo(loadDataBlock, linkOptions);
            loadDataBlock.LinkTo(processDataBlock, linkOptions);
            processDataBlock.LinkTo(waitForProcessedDataBlock, linkOptions);
            waitForProcessedDataBlock.LinkTo(mergeDataBlock, linkOptions);
            mergeDataBlock.LinkTo(saveDataBlock, linkOptions);

            // We post individual tuples of (year, month) to our pipeline, as many as we want
            getInputPathsBlock.Post((1903, 2));  // Post one month and date
            var dates = from y in Enumerable.Range(2015, 5) from m in Enumerable.Range(1, 12) select (y, m);
            foreach (var date in dates) getInputPathsBlock.Post(date);  // Post a big sequence         

            getInputPathsBlock.Complete();
            saveDataBlock.Completion.Wait();
            s.Stop();
            Console.WriteLine($"Completed in {s.ElapsedMilliseconds}ms on {ThreadAndTime()}");
        }

        private IEnumerable<WorkItem> GetWorkItemWithInputPath((int year, int month) date)
        {
            List<WorkItem> processedWorkItems = new List<WorkItem>();  // Will store merged results
            return GetInputPaths(date.year, date.month).Select(
                path => new WorkItem
                {
                    Year = date.year,
                    Month = date.month,
                    FilePath = path,
                    ProcessedWorkItems = processedWorkItems
                });
        }

        // Get filepaths of form e.g. Files/20191101.txt  These aren't real files, they just show how it could work.
        private IEnumerable<string> GetInputPaths(int year, int month) =>
            Enumerable.Range(0, GetNumberOfFiles(year, month)).Select(i => $@"Files/{year}{Pad(month)}{Pad(i + 1)}.txt");

        private int GetNumberOfFiles(int year, int month) => DateTime.DaysInMonth(year, month);

        private WorkItem LoadDataIntoWorkItem(WorkItem workItem) {
            workItem.RawData = LoadData(workItem.FilePath);
            return workItem;
        }

        // Simulate loading by just concatenating to path: in real code this could open a real file and return the contents
        private string LoadData(string path) => "This is content from file " + path;

        private WorkItem ProcessDataForWorkItem(WorkItem workItem)
        {
            workItem.ProcessedData = ProcessData(workItem.RawData);
            return workItem;
        }

        private string ProcessData(string contents)
        {
            Thread.SpinWait(11000000); // Use 11,000,000 for ~50ms on Windows .NET Framework.  1,100,000 on Windows .NET Core.
            return $"Results of processing file with contents '{contents}' on {ThreadAndTime()}";
        }

        // Adds a processed WorkItem to its ProcessedWorkItems list.  Then checks if the list has as many processed WorkItems as we 
        // expect to see overall.  If so the list is returned to the next block, if not we return an empty array, which passes nothing on.
        // This isn't threadsafe for the list, so has to be called with MaxDegreeOfParallelization = 1
        private IEnumerable<List<WorkItem>> WaitForWorkItems(WorkItem workItem)
        {
            List<WorkItem> itemList = workItem.ProcessedWorkItems;
            itemList.Add(workItem);
            return itemList.Count == GetNumberOfFiles(workItem.Year, workItem.Month) ? new[] { itemList } : new List<WorkItem>[0];
        }

        private List<WorkItem> MergeWorkItemData(List<WorkItem> processedWorkItems)
        {
            string finalContents = "";
            foreach (WorkItem workItem in processedWorkItems)
            {
                finalContents = MergeData(finalContents, workItem.ProcessedData);
            }
            // Should really create a new data structure and return that, but let's cheat a bit
            processedWorkItems[0].MergedData = finalContents;
            return processedWorkItems;
        }

        // Just concatenate the output strings, separated by newlines, to merge our data
        private string MergeData(string output1, string output2) => output1 != "" ? output1 + "\n" + output2 : output2;

        private void SaveWorkItemData(List<WorkItem> workItems)
        {
            WorkItem result = workItems[0];
            SaveData(result.MergedData, result.Year, result.Month);
            // Code to show it's worked...
            Console.WriteLine($"Saved data block for {DateToString((result.Year, result.Month))} on {ThreadAndTime()}." +
                              $"  File contents:\n{result.MergedData}\n");
        }
        private void SaveData(string finalContents, int year, int month)
        {
            // Actually save, although don't really need to in this test code
            new DirectoryInfo("Results").Create();
            File.WriteAllText(Path.Combine("Results", $"results{year}{Pad(month)}.txt"), finalContents);
        }

        // Helper methods
        private string DateToString((int year, int month) date) => date.year + Pad(date.month);
        private string Pad(int number) => number < 10 ? "0" + number : number.ToString();
        private string ThreadAndTime() => $"thread {Pad(Thread.CurrentThread.ManagedThreadId)} at {DateTime.Now.ToString("hh:mm:ss.fff")}";
    }

    public class WorkItem
    {
        public int Year { get; set; }
        public int Month { get; set; }
        public string FilePath { get; set; }
        public string RawData { get; set; }
        public string ProcessedData { get; set; }
        public List<WorkItem> ProcessedWorkItems { get; set; }
        public string MergedData { get; set; }
    }
}

此代码从每个块中传递一个WorkItem对象到下一个,并在每个阶段对其进行丰富。之后,它创建了一个包含一个月内所有WorkItems的最终列表,然后在其中运行聚合过程并保存结果。

该代码基于使用您使用的名称的虚拟方法。这些不做太多事情,但希望能展示解决方案。例如,LoadData被传递文件路径,只是向其添加一些文本并将字符串传递,但如果实际上磁盘上有文件,它当然可以加载真实文件并将内容字符串传递。

类似地,为了模拟在ProcessData中进行工作,我们执行Thread.SpinWait,然后再次向字符串添加一些文本。这就是延迟的来源,因此如果您希望运行得更快或更慢,则可以更改数字。该代码是基于.NET Framework编写的,但它可以运行在Core 3.0、Ubuntu和OSX上。唯一的区别是SpinWait周期可能会明显长或短,因此您可能需要调整延迟。

请注意,我们本可以合并waitForProcessedDataBlock,然后完全按照您的要求建立管道。只是这可能会更加令人困惑。

代码最终会在磁盘上创建文件,但也会将结果输出到屏幕上,因此实际上并不需要。

如果将并行化设置为1,您会发现它的速度会减慢大约您所期望的量。我的Windows机器是四核的,速度比四倍慢一点。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接