并行解压缩日志文件 - 调整最大并行度以获得最高吞吐量

15

每天我们有高达30GB的GZipped日志文件。每个文件包含100,000行,压缩后大小在6到8MB之间。简化的代码已剥离解析逻辑,并利用Parallel.ForEach循环。

处理的最大行数峰值出现在两个NUMA节点、32个逻辑CPU盒子(Intel Xeon E7-2820 @ 2 GHz)的MaxDegreeOfParallelism为8时:

using System;

using System.Collections.Concurrent;

using System.Linq;
using System.IO;
using System.IO.Compression;

using System.Threading.Tasks;

namespace ParallelLineCount
{
    public class ScriptMain
    {
        static void Main(String[] args)
        {
            int    maxMaxDOP      = (args.Length > 0) ? Convert.ToInt16(args[0]) : 2;
            string fileLocation   = (args.Length > 1) ? args[1] : "C:\\Temp\\SomeFiles" ;
            string filePattern    = (args.Length > 1) ? args[2] : "*2012-10-30.*.gz";
            string fileNamePrefix = (args.Length > 1) ? args[3] : "LineCounts";

            Console.WriteLine("Start:                 {0}", DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ss.fffffffZ"));
            Console.WriteLine("Processing file(s):    {0}", filePattern);
            Console.WriteLine("Max MaxDOP to be used: {0}", maxMaxDOP.ToString());
            Console.WriteLine("");

            Console.WriteLine("MaxDOP,FilesProcessed,ProcessingTime[ms],BytesProcessed,LinesRead,SomeBookLines,LinesPer[ms],BytesPer[ms]");

            for (int maxDOP = 1; maxDOP <= maxMaxDOP; maxDOP++)
            {

                // Construct ConcurrentStacks for resulting strings and counters
                ConcurrentStack<Int64> TotalLines = new ConcurrentStack<Int64>();
                ConcurrentStack<Int64> TotalSomeBookLines = new ConcurrentStack<Int64>();
                ConcurrentStack<Int64> TotalLength = new ConcurrentStack<Int64>();
                ConcurrentStack<int>   TotalFiles = new ConcurrentStack<int>();

                DateTime FullStartTime = DateTime.Now;

                string[] files = System.IO.Directory.GetFiles(fileLocation, filePattern);

                var options = new ParallelOptions() { MaxDegreeOfParallelism = maxDOP };

                //  Method signature: Parallel.ForEach(IEnumerable<TSource> source, Action<TSource> body)
                Parallel.ForEach(files, options, currentFile =>
                    {
                        string filename = System.IO.Path.GetFileName(currentFile);
                        DateTime fileStartTime = DateTime.Now;

                        using (FileStream inFile = File.Open(fileLocation + "\\" + filename, FileMode.Open))
                        {
                            Int64 lines = 0, someBookLines = 0, length = 0;
                            String line = "";

                            using (var reader = new StreamReader(new GZipStream(inFile, CompressionMode.Decompress)))
                            {
                                while (!reader.EndOfStream)
                                {
                                    line = reader.ReadLine();
                                    lines++; // total lines
                                    length += line.Length;  // total line length

                                    if (line.Contains("book")) someBookLines++; // some special lines that need to be parsed later
                                }

                                TotalLines.Push(lines); TotalSomeBookLines.Push(someBookLines); TotalLength.Push(length);
                                TotalFiles.Push(1); // silly way to count processed files :)
                            }
                        }
                    }
                );

                TimeSpan runningTime = DateTime.Now - FullStartTime;

                // Console.WriteLine("MaxDOP,FilesProcessed,ProcessingTime[ms],BytesProcessed,LinesRead,SomeBookLines,LinesPer[ms],BytesPer[ms]");
                Console.WriteLine("{0},{1},{2},{3},{4},{5},{6},{7}",
                    maxDOP.ToString(),
                    TotalFiles.Sum().ToString(),
                    Convert.ToInt32(runningTime.TotalMilliseconds).ToString(),
                    TotalLength.Sum().ToString(),
                    TotalLines.Sum(),
                    TotalSomeBookLines.Sum().ToString(),
                    Convert.ToInt64(TotalLines.Sum() / runningTime.TotalMilliseconds).ToString(),
                    Convert.ToInt64(TotalLength.Sum() / runningTime.TotalMilliseconds).ToString());

            }
            Console.WriteLine();
            Console.WriteLine("Finish:                " + DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ss.fffffffZ"));
        }
    }
}

以下是结果摘要,可以清晰地看到MaxDegreeOfParallelism=8处有明显的高峰:

enter image description here

CPU负载(此处显示的是聚合值,大部分负载都在单个NUMA节点上,即使DOP在20到30之间):

enter image description here

我发现让CPU负载超过95%的唯一方法是将文件分成4个不同的文件夹并执行相同的命令4次,每次针对所有文件的子集。

有人能找到瓶颈吗?


2
也许你的文件系统是限制因素... 作为一个测试,你可以尝试将文件加载到内存流中,并使用它来代替文件流... - Yahia
它已经在附加的截图中了 - 你需要它的文本格式吗? - milivojeviCH
不,谢谢,我只是想阅读代码,跳过分析部分 :) - ulrichb
所有的600MB源文件都在缓存中,磁盘没有被使用。 - milivojeviCH
1
@mceda 有趣的行为。我尝试在我的一个服务器上使用你的样本重现你的结果,但是运气不太好;我的测试能够在测试期间维持100%的CPU使用率。由于之前的回复似乎排除了I/O瓶颈的可能性,那么这个进程是否存在处理器亲和力的限制或者线程池中可用线程的上限较低的可能性呢? - Monroe Thomas
显示剩余5条评论
4个回答

9

可能问题之一是默认的FileStream构造函数使用的缓冲区大小较小。我建议您使用更大的输入缓冲区,例如:

using (FileStream infile = new FileStream(
    name, FileMode.Open, FileAccess.Read, FileShare.None, 65536))

默认缓冲区大小为4千字节,这意味着线程需要经常调用I/O子系统来填充其缓冲区。64K的缓冲区意味着您将更少地进行这些调用。
我发现32K到256K之间的缓冲区大小可以获得最佳性能,其中64K是“甜点”,当我进行详细测试时。大于256K的缓冲区大小实际上开始降低性能。
此外,虽然这不太可能对性能产生重大影响,但您可能应该使用64位整数替换那些ConcurrentStack实例,并使用Interlocked.Add或Interlocked.Increment更新它们。这简化了您的代码并消除了管理集合的需要。
更新:
重新阅读您的问题描述,我被以下声明所打动:
"The only way I've found to make CPU load cross 95% mark was to split the files across 4 different folders and execute the same command 4 times, each one targeting a subset of all files."
对我来说,这指向了打开文件的瓶颈。就像操作系统在目录上使用互斥锁一样。即使所有数据都在缓存中,不需要物理I/O,进程仍然必须等待此锁定。还有可能是文件系统正在写入磁盘。请记住,每次打开文件时,它都必须更新文件的Last Access Time。
如果I/O确实是瓶颈,那么您可以考虑有一个单独的线程仅加载文件并将其放入BlockingCollection或类似的数据结构中,以便处理线程不必争夺目录上的锁。您的应用程序成为具有一个生产者和N个消费者的生产者/消费者应用程序。

你有这段代码的指针吗:“我的解决方案是将缓冲区大小设置为4MB,并在读取此类缓冲区时放置锁定。这将所有IO转换为顺序IO。” - milivojeviCH
1
@mceda 我编写了一个自定义流类,它与该类的其他实例协调。所有实例共享一个公共锁,并在读取和写入调用期间获取它。这确保只有一个应用程序同时访问磁盘。结合高缓冲区大小,这可以实现良好的顺序IO。 - usr
可能是一个愚蠢的评论,但你尝试过其他更快的解压库吗? 这里有一些:https://dev59.com/XXA75IYBdhLWcg3wdIp0 我自己使用过DotNetZip,在压缩前文件超过5GB时,我注意到解压速度提高了约10-20%。 - ElvisLives
就此而言,GzipStream类是一个简单的包装器,它围绕着一个具有8192字节缓冲区的DeflateStream进行同步读取。 - Monroe Thomas
授予此答案赏金,尽管它没有完全帮助。任务仍然存在。 - milivojeviCH
显示剩余5条评论

2

我认为问题在于您正在使用阻塞式I/O,因此您的线程无法充分利用并行性。

如果我正确理解了您的算法(抱歉,我更擅长C++),那么每个线程都在执行以下操作(伪代码):

while (there is data in the file)
    read data
    gunzip data

相反,更好的方法应该是这样的:
N = 0
read data block N
while (there is data in the file)
    asyncRead data block N+1
    gunzip data block N
    N = N + 1
gunzip data block N
asyncRead调用不会阻塞,因此基本上你可以并发地对块N进行解码和读取块N +1,这样当你完成对块N的解码时,你可能已经准备好了块N +1(如果I/O比解码更慢,则准备好块N +1的时间会更靠近)。
然后就是找到能够给你最佳吞吐量的块大小。
祝好运。

2
这通常是因为线程同步过于频繁的缘故。
在您的代码中寻找同步,我可以看到集合上存在大量同步。您的线程正在逐个推送行。这意味着每一行最多会产生一个Interlocked操作,最坏的情况下会产生内核模式锁等待。Interlocked操作将发生激烈争用,因为所有线程都竞争将其当前行放入集合中。他们都试图更新相同的内存位置。这会导致高速缓存行反应迟钝。
将其更改为以更大的块推送行。将100行或更多行的行数组推送出去。越多越好。
换句话说,首先在线程本地集合中收集结果,只有在必要时才合并到全局结果中。
您甚至可能想要完全摆脱手动数据推送。这就是PLINQ的用途:同时流式传输数据。 PLINQ以良好的性能方式抽象了所有并发集合操作。

1
唯一的“推送”发生在最后,在整个文件的结果可用之后(在这种情况下,等于99个文件的数量)。或者,我有什么遗漏的吗?我会看看PLINQ,谢谢! - milivojeviCH
2
我同意,我误读了代码。不过我还是会留下这个答案,因为它包含了一些有用的想法。 - usr
我完全同意这个想法,在实际应用中,我们会在文件处理结束时将所有解析的行一次性推送到一个批处理中。 - milivojeviCH

2
我认为并行读取磁盘不会对您有所帮助。实际上,这可能会严重影响性能,因为同时从多个存储区域读取会产生争用。
我建议重新构建程序,首先将原始文件数据单线程读入字节数组的内存流中。然后,在每个流或缓冲区上使用Parallel.ForEach()进行解压缩和计数。
您需要在最开始时承担IO读取负担,但让操作系统/硬件优化希望大部分是顺序读取,然后在内存中进行解压缩和解析。
请记住,像解压缩、Encoding.UTF8.ToString()、String.Split()等操作会使用大量内存,因此在不再需要它们时清理引用/处理旧缓冲区。
如果按照这种方式操作,我相信您可以使机器产生一些严重的浪费。
希望这可以帮到您。

@mceda - 真的吗?其他的建议似乎都没有做到这一点。将解压缩移入单线程部分怎么样?也许那里会发生一些串行化的事情?另一个尝试的方法是转储ConcurrentStacks。我已经注意到它们存在一些限制性行为。你应该能够用基本的Dictionary<string, byte[]>结构来替换它,并将键传递给线程lambda函数。这应该是相当安全的。 - saarp
1
@mceda - 好的,我错过了最后一部分。如果你已经尝试过这个,能否发布那个版本的代码? - saarp
我已经失去了尝试了多少种不同的方法的计数。最新的尝试包括并发可视化器输出等。我会有一个新问题,这一个有很多好的提示,但不幸的是没有一个能直接帮助我的情况。 - milivojeviCH
1
@mceda - 我理解这很令人沮丧。不过,这个帖子里有很多好的信息。如果失去它,那真是太可惜了,而且我也不确定另一个问题会得到不同的结果。不幸的是,随着stackoverflow变得越来越受欢迎,质量在过去几个月里似乎已经下降了很多。 - saarp
没问题,我会尝试在多个地方发布这些发现。对于这个问题,如果我将最初的问题更改为最新状态,大部分评论都将变得无关紧要。我可能会发布一个摘要“尝试回答”并接受它 :) - milivojeviCH
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接