范畴:
- 我想把一个大文件(1GB+)分割成小块(可管理的),将它们存储在某种存储基础设施上(本地磁盘、Blob、网络等),并逐一在内存中处理它们。
- 我想通过利用TPL Dataflow库来实现这一目标,并创建了多个处理块,每个块执行特定操作,在内存中的文件分区上执行。
- 此外,我正在使用SemaphoreSlim对象来限制在任何时候正在处理的内存分区的最大数量,直到其被加载并完全处理。
- 我还在块级别使用MaxDegreeOfParallelism配置属性来限制每个块的并行度。
从技术角度来看,范围是通过使用信号量在多个连续的管道步骤中限制并行处理多个分区的处理,从而避免过载内存。
问题描述:当除第一个块以外的所有Dataflow块的MaxDegreeOfParallelism设置为大于1的值时,进程会挂起并似乎陷入死锁状态。当MaxDegreeOfParallelism设置为1时,一切都按预期工作。以下是代码示例...
您有任何想法/提示/建议为什么会发生这种情况吗?
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace DemoConsole
{
class Program
{
private static readonly SemaphoreSlim _localSemaphore = new(1);
static async Task Main(string[] args)
{
Console.WriteLine("Configuring pipeline...");
var dataflowLinkOptions = new DataflowLinkOptions() { PropagateCompletion = true };
var filter1 = new TransformManyBlock<string, PartitionInfo>(CreatePartitionsAsync, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
// when MaxDegreeOfParallelism on the below line is set to 1, everything works as expected; any value greater than 1 causes issues
var blockOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 };
var filter2 = new TransformBlock<PartitionInfo, PartitionInfo>(ReadPartitionAsync, blockOptions);
var filter3 = new TransformBlock<PartitionInfo, PartitionInfo>(MapPartitionAsync, blockOptions);
var filter4 = new TransformBlock<PartitionInfo, PartitionInfo>(ValidatePartitionAsync, blockOptions);
var actionBlock = new ActionBlock<PartitionInfo>(async (x) => { await Task.CompletedTask; });
filter1.LinkTo(filter2, dataflowLinkOptions);
filter2.LinkTo(filter3, dataflowLinkOptions);
filter3.LinkTo(filter4, dataflowLinkOptions);
filter4.LinkTo(actionBlock, dataflowLinkOptions);
await filter1.SendAsync("my-file.csv");
filter1.Complete();
await actionBlock.Completion;
Console.WriteLine("Pipeline completed.");
Console.ReadKey();
Console.WriteLine("Done");
}
private static async Task<IEnumerable<PartitionInfo>> CreatePartitionsAsync(string input)
{
var partitions = new List<PartitionInfo>();
const int noOfPartitions = 10;
Log($"Creating {noOfPartitions} partitions from raw file on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");
for (short i = 1; i <= noOfPartitions; i++)
{
partitions.Add(new PartitionInfo { FileName = $"{Path.GetFileNameWithoutExtension(input)}-p{i}-raw.json", Current = i });
}
await Task.CompletedTask;
Log($"Creating {noOfPartitions} partitions from raw file completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");
return partitions;
}
private static async Task<PartitionInfo> ReadPartitionAsync(PartitionInfo input)
{
Log($"Sempahore - trying to enter for partition [{input.Current}] - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");
await _localSemaphore.WaitAsync();
Log($"Sempahore - entered for partition [{input.Current}] - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");
Log($"Reading partition [{input.Current}] on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");
await Task.Delay(1000);
Log($"Reading partition [{input.Current}] completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");
return input;
}
private static async Task<PartitionInfo> MapPartitionAsync(PartitionInfo input)
{
Log($"Mapping partition [{input.Current}] on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");
await Task.Delay(1000);
Log($"Mapping partition [{input.Current}] completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");
return input;
}
private static async Task<PartitionInfo> ValidatePartitionAsync(PartitionInfo input)
{
Log($"Validating partition [{input.Current}] on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");
await Task.Delay(1000);
Log($"Validating partition [{input.Current}] completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");
Log($"Sempahore - releasing - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");
_localSemaphore.Release();
Log($"Sempahore - released - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");
return input;
}
private static void Log(string message) => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} : {message}");
}
class PartitionInfo
{
public string FileName { get; set; }
public short Current { get; set; }
}
}
SemaphoreSlim
是不寻常的。要协调多个块,请查看ConcurrentExclusiveTaskScheduler
。 - Stephen Cleary