使用Python multiprocessing处理一个读取文件的生成器

11

我试图读取和处理成千上万个文件,但不幸的是,处理文件所需的时间约为从磁盘读取的时间的3倍,因此我希望在读入文件时就处理这些文件(并且同时继续读入其他文件)。

在理想情况下,我有一个生成器一次读取一个文件,并且我想将此生成器传递给一组工作线程,以便它们可以处理由生成器生成的项目。

这里是一个示例:

def process_file(file_string):
     ...
     return processed_file

 pool = Pool(processes=4)
 path = 'some/path/'
 results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))

上述代码唯一的问题是在池开始之前所有文件都被读入内存,这意味着我需要等待磁盘将所有内容读入内存,并且会消耗大量的内存。


尝试在pool.map()调用中指定一个 chunksize 参数,以控制在每次提交可迭代对象的多少个元素作为独立任务提交到Pool - martineau
2个回答

10

Pool.mapPool.map_async listify 传递给它们的 iterable ,所以在处理开始之前,您的生成器将始终被完全实现。

各种 Pool.imap* 函数似乎会将输入作为生成器进行处理,因此您可能可以更改:

results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))

至:

# If you can process outputs one at a time, drop the list wrapper
# If you can process outputs without order mattering, imap_unordered will
# get you the best results
results = list(pool.imap(process_file, (open(path+part,'rb').read() for part in os.listdir(path))))

根据我的理解,你可以在处理之前不需要 slurping,但他们似乎仍然会尽可能快地完全填充队列,这可能会导致大量未完成的数据和过度的内存使用; 另外,您将在一个进程中读取所有数据,然后将其全部通过 IPC 发送,这意味着您仍然主要受到 I/O 的限制。

在你的位置上,我会把读取操作移到任务本身中(如果可以的话,避免一次性读取整个文件,而是按行或块来处理)。这样可以实现并行读取,减少IPC,而且不会冒着在前几个文件被处理之前就读取了所有文件的风险;您永远不会打开超过您拥有的工作进程数量的文件。因此最终结果看起来会像:

def process_file(path):
     with open(path, 'rb') as f:
         file_string = f.read()
     ... same as before ...
     return processed_file

pool = Pool(processes=4)
path = 'some/path/'
results = pool.imap(process_file, (os.path.join(path, part) for part in os.listdir(path)))

如果有多个子进程尝试读取文件,我认为这将导致交错的磁盘读取和磁盘抖动? - mgoldwasser
@mgoldwasser:这取决于存储介质;例如在NFS上,你通常会受到延迟或每个连接带宽的限制,而不是同时读取。需要进行测试以确定是否存在性能问题。如果读取块或行而不是一次性读取整个文件(正如我在答案中建议的那样,尽管我缺乏提供有用示例的信息),则在块读取之间执行的工作可能会减少冲突。或者,在一次性读取时,可以使用单个multiprocessing.Lock将读取限制为一次只能由一个工作进程完成。最佳选项因情况而异。 - ShadowRanger
mgoldwasser,你可以测量一下;)只需比较两种可能的解决方案:我的带有yield和@ShadowRanger的带有多进程读取。因为它取决于硬件/软件/网络... - Jimilian
@ShadowRanger发现将文件读取移至process_file函数中对运行时间没有影响。使用imap()是提高速度的真正关键,谢谢! - mgoldwasser

2
您正在将文件读入父进程的内存中,然后将有效负载传输到子进程中。这相当低效。只发送文件名,让子进程执行I/O操作。如果结果是一堆文本,您计划将其写入文件,则也应在子进程中执行该操作。
通常,map会一次性向其池工人发出大块的工作,以减少通信开销。这可能是您遇到大内存峰值的原因。仅传递文件名可以解决此问题,但在工人之间存在处理时间不均匀时,设置小的块大小仍然是有益的。
def process_file(filename):
     with open(filename, 'rb') as fp:
         file_string = fp.read()
     ...
     return processed_file

 pool = Pool(processes=4)
 path = 'some/path/'
 results = pool.map(process_file, path+part for part in os.listdir(path)), chunksize=1)

如果有多个子进程尝试读取文件,我认为这将导致交错的磁盘读取和磁盘抖动? - mgoldwasser
@mgoldwasser - 它是可以的,但是你希望保持你的磁盘通道保持运行,通过让一些工作人员获取数据,而其他人忙于处理数据。并且队列重新排序在磁头飞过数据时有一定的优势。使用4个工作人员读取数据时,你不会遇到显著的抖动问题。 - tdelaney
这种方法确实有效,并且可以大大缩短时间。使用imap()而不是map()可以获得最大的收益。 - mgoldwasser
感谢更新。在现实世界中,多进程调整的工作方式可能会令人惊讶。 - tdelaney

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接