TPL Dataflow使用SemaphoreSlim进行限流时出现的问题

4

范畴:

  • 我想把一个大文件(1GB+)分割成小块(可管理的),将它们存储在某种存储基础设施上(本地磁盘、Blob、网络等),并逐一在内存中处理它们。
  • 我想通过利用TPL Dataflow库来实现这一目标,并创建了多个处理块,每个块执行特定操作,在内存中的文件分区上执行。
  • 此外,我正在使用SemaphoreSlim对象来限制在任何时候正在处理的内存分区的最大数量,直到其被加载并完全处理。
  • 我还在块级别使用MaxDegreeOfParallelism配置属性来限制每个块的并行度。

从技术角度来看,范围是通过使用信号量在多个连续的管道步骤中限制并行处理多个分区的处理,从而避免过载内存。

问题描述:当除第一个块以外的所有Dataflow块的MaxDegreeOfParallelism设置为大于1的值时,进程会挂起并似乎陷入死锁状态。当MaxDegreeOfParallelism设置为1时,一切都按预期工作。以下是代码示例...

您有任何想法/提示/建议为什么会发生这种情况吗?

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DemoConsole
{
    class Program
    {
        private static readonly SemaphoreSlim _localSemaphore = new(1);

        static async Task Main(string[] args)
        {
            Console.WriteLine("Configuring pipeline...");

            var dataflowLinkOptions = new DataflowLinkOptions() { PropagateCompletion = true };

            var filter1 = new TransformManyBlock<string, PartitionInfo>(CreatePartitionsAsync, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

            // when MaxDegreeOfParallelism on the below line is set to 1, everything works as expected; any value greater than 1 causes issues              
            var blockOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 };

            var filter2 = new TransformBlock<PartitionInfo, PartitionInfo>(ReadPartitionAsync, blockOptions);
            var filter3 = new TransformBlock<PartitionInfo, PartitionInfo>(MapPartitionAsync, blockOptions);
            var filter4 = new TransformBlock<PartitionInfo, PartitionInfo>(ValidatePartitionAsync, blockOptions);

            var actionBlock = new ActionBlock<PartitionInfo>(async (x) => { await Task.CompletedTask; });

            filter1.LinkTo(filter2, dataflowLinkOptions);
            filter2.LinkTo(filter3, dataflowLinkOptions);
            filter3.LinkTo(filter4, dataflowLinkOptions);
            filter4.LinkTo(actionBlock, dataflowLinkOptions);

            await filter1.SendAsync("my-file.csv");

            filter1.Complete();

            await actionBlock.Completion;

            Console.WriteLine("Pipeline completed.");
            Console.ReadKey();
            Console.WriteLine("Done");
        }

        private static async Task<IEnumerable<PartitionInfo>> CreatePartitionsAsync(string input)
        {
            var partitions = new List<PartitionInfo>();
            const int noOfPartitions = 10;

            Log($"Creating {noOfPartitions} partitions from raw file on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");

            for (short i = 1; i <= noOfPartitions; i++)
            {
                partitions.Add(new PartitionInfo { FileName = $"{Path.GetFileNameWithoutExtension(input)}-p{i}-raw.json", Current = i });
            }

            await Task.CompletedTask;

            Log($"Creating {noOfPartitions} partitions from raw file completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");

            return partitions;
        }

        private static async Task<PartitionInfo> ReadPartitionAsync(PartitionInfo input)
        {
            Log($"Sempahore - trying to enter for partition [{input.Current}] - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");
            await _localSemaphore.WaitAsync();
            Log($"Sempahore - entered for partition [{input.Current}] - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");

            Log($"Reading partition [{input.Current}] on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");
            await Task.Delay(1000);
            Log($"Reading partition [{input.Current}] completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");

            return input;
        }

        private static async Task<PartitionInfo> MapPartitionAsync(PartitionInfo input)
        {
            Log($"Mapping partition [{input.Current}] on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");
            await Task.Delay(1000);
            Log($"Mapping partition [{input.Current}] completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");

            return input;
        }

        private static async Task<PartitionInfo> ValidatePartitionAsync(PartitionInfo input)
        {
            Log($"Validating partition [{input.Current}] on Thread [{Thread.CurrentThread.ManagedThreadId}] ...");
            await Task.Delay(1000);
            Log($"Validating partition [{input.Current}] completed on Thread [{Thread.CurrentThread.ManagedThreadId}].");

            Log($"Sempahore - releasing - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");
            _localSemaphore.Release();
            Log($"Sempahore - released - Current count is [{_localSemaphore.CurrentCount}]; client thread [{Thread.CurrentThread.ManagedThreadId}]");

            return input;
        }

        private static void Log(string message) => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} : {message}");
    }

    class PartitionInfo
    {
        public string FileName { get; set; }
        public short Current { get; set; }
    }
}

3
你考虑过使用BoundedCapacity块选项吗?(而不是SemaphoreSlim) - Fildor
1
为什么要使用 SemaphoreSlim?Dataflow 已经通过 MaxDegreeOfParallelism 和 BoundedCapacity 设置支持限制流量和反压。我使用 SemaphoreSlim 对象限制在任何时候处理的内存分区的最大数量,这就是 MaxDegreeOfParallelism 的作用。 - Panagiotis Kanavos
1
在Dataflow中使用SemaphoreSlim是不寻常的。要协调多个块,请查看ConcurrentExclusiveTaskScheduler - Stephen Cleary
1
例如,我需要每15分钟处理50K-100K张机票,并从航空公司请求详细的机票记录,解析响应并将其插入数据库。如果我使用无限制的块,则会有100K个请求在机票记录块中等待慢速下载过程完成。XML响应非常大,这将浪费大量RAM。通过将BoundedCapacity设置为8,我只保留8条记录在飞行中。通过将MaxDOP设置为8,我只进行8个并发请求。最后,BatchBlock批处理结果以插入数据库。 - Panagiotis Kanavos
1
不完全是这样。您仍然描述了尝试的解决方案。如果您只想一次处理一个项目,为什么要使用Dataflow呢?这与询问如何创建一系列bash命令的管道但强制工具一次处理一个项目没有区别。在这种情况下,不要使用管道。或在循环内部执行管道。另一方面,让5个块每次处理一行有什么问题吗?无论文件有多大,10MB还是10TB,每次只有5行在内存中。加上流的缓冲区,通常为4KB或8KB。 - Panagiotis Kanavos
显示剩余16条评论
1个回答

2

在实施此解决方案之前,请查看评论,因为您的代码存在基本架构问题。

然而,您发布的问题是可以重现并且可以通过更改以下ExecutionDataflowBlockOption来解决:

new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5, EnsureOrdered = false });

EnsureOrdered属性默认为true。当并行度>1时,不能保证哪个消息会被首先处理。如果首先处理的消息不是块接收到的第一条消息,则它将等待在重新排序缓冲区中,直到其接收的第一条消息完成。因为filter1是TransformManyBlock,我甚至不确定是否可能知道每个消息发送到filter2的顺序。
如果您运行足够多次代码,最终会有幸运的时候,第一条发送到filter2的消息也会首先被处理,在这种情况下,它将释放信号量并继续进行。但是您在处理的下一条消息上仍然会遇到相同的问题;如果它不是接收到的第二条消息,则它将在重新排序缓冲区中等待。

聪明的解释和解决方案! - Theodor Zoulias
谢谢你的回答。让我试一试,然后我会回来反馈。 - Bogdan Rotaru

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接