介绍一下我的背景,我更喜欢使用像H2O这样的开源工具进行超高性能并行CSV文件读取,但该工具在功能集方面受到限制。因此,在将数据馈送到H2O集群进行监督学习之前,我需要编写大量代码来创建数据科学管道。
对于数据科学目的而言,我使用“multiprocessing”库的池对象和映射函数添加了大量并行处理,从而明显提高了读取8GB HIGGS数据集(来自UCI数据仓库)甚至40GB CSV文件的速度。例如,使用最近邻搜索、DBSCAN和马尔可夫聚类算法进行聚类需要一些并行编程技巧才能避免一些严峻的内存和墙钟时间问题。
通常情况下,我会先使用GNU工具将文件按行分成部分,然后使用glob-filemask在Python程序中查找并并行读取它们。我通常使用超过1000个部分文件。通过这些技巧,可以极大地提高处理速度和内存限制。
pandas dataframe.read_csv是单线程的,因此您可以使用map()进行并行执行以使pandas更快。使用htop可以看到,在顺序进行pandas dataframe.read_csv时,100% CPU占用率只是一个核心中的实际瓶颈,而根本不是磁盘。
我还应该指出,我正在使用SSD和快速视频卡总线,并非SATA6总线上的旋转硬盘,同时具有16个CPU核心。
另外,在某些应用程序中,我发现另一种技术也可以很好地运行,即在一个巨大文件内执行并行CSV文件读取,让每个工作进程从不同的偏移量开始,而不是将一个大文件预先分成多个部分文件。在每个并行工作进程中使用Python的file seek()和tell()读取大型文本文件中的条带,使用不同的字节偏移量开始和结束字节位置,并同时进行处理。您可以对字节进行正则表达式查找,并返回行尾符号的数量。这是一个局部小计。最后,将局部小计相加以在工作完成后映射函数返回时获取全局小计。
下面是使用并行字节偏移技巧的一些示例基准测试:
我使用2个文件:HIGGS.csv为8GB。它来自UCI机器学习存储库。all_bin .csv为40.4 GB,来自我的当前项目。我使用2个程序:Linux中自带的GNU wc程序和我开发的纯Python fastread.py程序。
HP-Z820:/mnt/fastssd/fast_file_reader$ ls -l /mnt/fastssd/nzv/HIGGS.csv
-rw-rw-r-- 1 8035497980 Jan 24 16:00 /mnt/fastssd/nzv/HIGGS.csv
HP-Z820:/mnt/fastssd$ ls -l all_bin.csv
-rw-rw-r-- 1 40412077758 Feb 2 09:00 all_bin.csv
ga@ga-HP-Z820:/mnt/fastssd$ time python fastread.py --fileName="all_bin.csv" --numProcesses=32 --balanceFactor=2
2367496
real 0m8.920s
user 1m30.056s
sys 2m38.744s
In [1]: 40412077758. / 8.92
Out[1]: 4530501990.807175
那是大约4.5GB/s或45 Gb/s的文件读取速度。这可不是普通的硬盘,我的朋友。实际上,这是一个Samsung Pro 950固态硬盘。
下面是相同文件被gnu wc逐行计数的速度基准测试结果,这是一个纯C编译程序。
有趣的是,你可以看到我的纯Python程序在这种情况下基本上与gnu wc编译的C程序的速度相匹配。Python是解释性语言,而C是编译性语言,所以我认为这是一项相当有趣的速度成果。当然,wc真的需要改成并行程序,那么它就能轻松地击败我的python程序了。但就目前而言,gnu wc只是一个顺序执行程序。你知道自己能做什么,而Python今天可以并行。Cython编译可能会对我有所帮助(时间另议)。此外,内存映射文件还没有被探索过。
HP-Z820:/mnt/fastssd$ time wc -l all_bin.csv
2367496 all_bin.csv
real 0m8.807s
user 0m1.168s
sys 0m7.636s
HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=16 --balanceFactor=2
11000000
real 0m2.257s
user 0m12.088s
sys 0m20.512s
HP-Z820:/mnt/fastssd/fast_file_reader$ time wc -l HIGGS.csv
11000000 HIGGS.csv
real 0m1.820s
user 0m0.364s
sys 0m1.456s
结论:与C程序相比,这个纯Python程序的速度很快。然而,在行计数的目的下,仍然不足以使用纯Python程序代替C程序。通常,该技术可用于其他文件处理,因此这个Python代码仍然很好。
问题:编译正则表达式一次并将其传递给所有工作进程是否会提高速度?答案:在这个应用程序中,正则表达式预编译并不能提高性能。我认为原因是所有工作进程的进程序列化和创建的开销占主导。
还有一件事情。并行CSV文件读取真的有帮助吗?瓶颈是磁盘还是CPU?许多在stackoverflow上所谓的最佳答案包含了一个普遍的开发人员智慧,即你只需要一个线程来读取文件,他们说的最好。但他们确定吗?让我们找出答案:
HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=16 --balanceFactor=2
11000000
real 0m2.256s
user 0m10.696s
sys 0m19.952s
HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=1 --balanceFactor=1
11000000
real 0m17.380s
user 0m11.124s
sys 0m6.272s
没错,它确实可以。并行文件读取效果很好。就是这样!
附:如果您想知道,如果在单个工作进程中使用balanceFactor为2会怎样?好吧,情况非常糟糕:
HP-Z820:/mnt/fastssd/fast_file_reader$ time python fastread.py --fileName="HIGGS.csv" --numProcesses=1 --balanceFactor=2
11000000
real 1m37.077s
user 0m12.432s
sys 1m24.700s
快速阅读.py Python程序的关键部分:
fileBytes = stat(fileName).st_size
startByte, endByte = PartitionDataToWorkers(workers=numProcesses, items=fileBytes, balanceFactor=balanceFactor)
p = Pool(numProcesses)
partialSum = p.starmap(ReadFileSegment, zip(startByte, endByte, repeat(fileName)))
globalSum = sum(partialSum)
print(globalSum)
def ReadFileSegment(startByte, endByte, fileName, searchChar='\n'):
with open(fileName, 'r') as f:
f.seek(startByte-1)
bytes = f.read(endByte - startByte + 1)
cnt = len(re.findall(searchChar, bytes))
return cnt
< p >PartitionDataToWorkers的定义只是普通的顺序代码。我刻意省略了它,以便其他人可以练习并了解并行编程的工作方式。更难的部分是我免费提供的:经过测试和工作正常的并行代码,有助于你的学习。< /p >
< p >感谢开源H2O项目,由Arno、Cliff和H2O员工开发的优秀软件和教学视频,为上述纯Python高性能并行字节偏移量读取器提供了灵感。H2O使用Java进行并行文件读取,可由Python和R程序调用,并且速度非常快,在读取大型CSV文件方面比任何其他软件都要快。< /p >