你的问题的答案是:不,你不需要另一种块类型;是的,你可以两次使用 TransformManyBlock;它是有意义的。我写了一些代码来证明它,并放在底部,还有一些关于它如何工作的注释。
代码使用分裂然后合并的管道,就像你描述的那样。至于你遇到的问题:将单个文件的数据合并在一起可以通过在可用时将处理过的项添加到列表中来完成。只有当列表具有预期的最终条目数时,我们才将其传递给下一个块。这可以使用相当简单的 TransformMany 块返回零或一个项来完成。该块不能并行化,因为该列表不是线程安全的。
一旦你有了这样的管道,你就可以通过仅使用传递给块的选项来测试并行化和排序。下面的代码将并行化设置为每个块都是无界的,并让 DataFlow 代码解决问题。在我的机器上,它利用了所有的核心/逻辑处理器,是 CPU 绑定的,这正是我们想要的。启用了排序,但关闭它并没有太大的区别:我们仍然是 CPU 绑定的。
最后我必须说这是一个非常酷的技术,但实际上你可以使用PLINQ更简单地解决这个问题,只需几行代码就能得到同样快的结果。最大的缺点是如果你这样做,你不能轻松地逐步添加快速到达的消息到管道中:PLINQ更适合于一个大批处理过程。然而,对于您的用例,PLINQ可能是更好的解决方案。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks.Dataflow;
namespace ParallelDataFlow
{
class Program
{
static void Main(string[] args)
{
new Program().Run();
Console.ReadLine();
}
private void Run()
{
Stopwatch s = new Stopwatch();
s.Start();
var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded };
var getInputPathsBlock = new TransformManyBlock<(int, int), WorkItem>(date => GetWorkItemWithInputPath(date), options);
var loadDataBlock = new TransformBlock<WorkItem, WorkItem>(workItem => LoadDataIntoWorkItem(workItem), options);
var processDataBlock = new TransformBlock<WorkItem, WorkItem>(workItem => ProcessDataForWorkItem(workItem), options);
var waitForProcessedDataBlock = new TransformManyBlock<WorkItem, List<WorkItem>>(workItem => WaitForWorkItems(workItem));
var mergeDataBlock = new TransformBlock<List<WorkItem>, List<WorkItem>>(list => MergeWorkItemData(list), options);
var saveDataBlock = new ActionBlock<List<WorkItem>>(list => SaveWorkItemData(list), options);
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
getInputPathsBlock.LinkTo(loadDataBlock, linkOptions);
loadDataBlock.LinkTo(processDataBlock, linkOptions);
processDataBlock.LinkTo(waitForProcessedDataBlock, linkOptions);
waitForProcessedDataBlock.LinkTo(mergeDataBlock, linkOptions);
mergeDataBlock.LinkTo(saveDataBlock, linkOptions);
getInputPathsBlock.Post((1903, 2));
var dates = from y in Enumerable.Range(2015, 5) from m in Enumerable.Range(1, 12) select (y, m);
foreach (var date in dates) getInputPathsBlock.Post(date);
getInputPathsBlock.Complete();
saveDataBlock.Completion.Wait();
s.Stop();
Console.WriteLine($"Completed in {s.ElapsedMilliseconds}ms on {ThreadAndTime()}");
}
private IEnumerable<WorkItem> GetWorkItemWithInputPath((int year, int month) date)
{
List<WorkItem> processedWorkItems = new List<WorkItem>();
return GetInputPaths(date.year, date.month).Select(
path => new WorkItem
{
Year = date.year,
Month = date.month,
FilePath = path,
ProcessedWorkItems = processedWorkItems
});
}
private IEnumerable<string> GetInputPaths(int year, int month) =>
Enumerable.Range(0, GetNumberOfFiles(year, month)).Select(i => $@"Files/{year}{Pad(month)}{Pad(i + 1)}.txt");
private int GetNumberOfFiles(int year, int month) => DateTime.DaysInMonth(year, month);
private WorkItem LoadDataIntoWorkItem(WorkItem workItem) {
workItem.RawData = LoadData(workItem.FilePath);
return workItem;
}
private string LoadData(string path) => "This is content from file " + path;
private WorkItem ProcessDataForWorkItem(WorkItem workItem)
{
workItem.ProcessedData = ProcessData(workItem.RawData);
return workItem;
}
private string ProcessData(string contents)
{
Thread.SpinWait(11000000);
return $"Results of processing file with contents '{contents}' on {ThreadAndTime()}";
}
private IEnumerable<List<WorkItem>> WaitForWorkItems(WorkItem workItem)
{
List<WorkItem> itemList = workItem.ProcessedWorkItems;
itemList.Add(workItem);
return itemList.Count == GetNumberOfFiles(workItem.Year, workItem.Month) ? new[] { itemList } : new List<WorkItem>[0];
}
private List<WorkItem> MergeWorkItemData(List<WorkItem> processedWorkItems)
{
string finalContents = "";
foreach (WorkItem workItem in processedWorkItems)
{
finalContents = MergeData(finalContents, workItem.ProcessedData);
}
processedWorkItems[0].MergedData = finalContents;
return processedWorkItems;
}
private string MergeData(string output1, string output2) => output1 != "" ? output1 + "\n" + output2 : output2;
private void SaveWorkItemData(List<WorkItem> workItems)
{
WorkItem result = workItems[0];
SaveData(result.MergedData, result.Year, result.Month);
Console.WriteLine($"Saved data block for {DateToString((result.Year, result.Month))} on {ThreadAndTime()}." +
$" File contents:\n{result.MergedData}\n");
}
private void SaveData(string finalContents, int year, int month)
{
new DirectoryInfo("Results").Create();
File.WriteAllText(Path.Combine("Results", $"results{year}{Pad(month)}.txt"), finalContents);
}
private string DateToString((int year, int month) date) => date.year + Pad(date.month);
private string Pad(int number) => number < 10 ? "0" + number : number.ToString();
private string ThreadAndTime() => $"thread {Pad(Thread.CurrentThread.ManagedThreadId)} at {DateTime.Now.ToString("hh:mm:ss.fff")}";
}
public class WorkItem
{
public int Year { get; set; }
public int Month { get; set; }
public string FilePath { get; set; }
public string RawData { get; set; }
public string ProcessedData { get; set; }
public List<WorkItem> ProcessedWorkItems { get; set; }
public string MergedData { get; set; }
}
}
此代码从每个块中传递一个WorkItem对象到下一个,并在每个阶段对其进行丰富。之后,它创建了一个包含一个月内所有WorkItems的最终列表,然后在其中运行聚合过程并保存结果。
该代码基于使用您使用的名称的虚拟方法。这些不做太多事情,但希望能展示解决方案。例如,LoadData被传递文件路径,只是向其添加一些文本并将字符串传递,但如果实际上磁盘上有文件,它当然可以加载真实文件并将内容字符串传递。
类似地,为了模拟在ProcessData中进行工作,我们执行Thread.SpinWait,然后再次向字符串添加一些文本。这就是延迟的来源,因此如果您希望运行得更快或更慢,则可以更改数字。该代码是基于.NET Framework编写的,但它可以运行在Core 3.0、Ubuntu和OSX上。唯一的区别是SpinWait周期可能会明显长或短,因此您可能需要调整延迟。
请注意,我们本可以合并waitForProcessedDataBlock,然后完全按照您的要求建立管道。只是这可能会更加令人困惑。
代码最终会在磁盘上创建文件,但也会将结果输出到屏幕上,因此实际上并不需要。
如果将并行化设置为1,您会发现它的速度会减慢大约您所期望的量。我的Windows机器是四核的,速度比四倍慢一点。
LoadDataBlock1
,LoadDataBlock2
等等只会接收一个要处理的项目?每个块都只有一个inputPath
吗? - Theodor ZouliasLoadDataBlock
(一个看起来就足够了),但关于你问题的本质,特别是合并部分,你尝试过将所有的ProcessDataBlock
简单地LinkTo
到单个的MergeDataBlock
吗? - Theodor Zouliasvar LoadDataBlock = Transform....
仅会出现一次(同样适用于ProcessDataBlock
),但它应该针对每个输入路径单独运行。这有意义吗? - Little geek