我试图读取和处理成千上万个文件,但不幸的是,处理文件所需的时间约为从磁盘读取的时间的3倍,因此我希望在读入文件时就处理这些文件(并且同时继续读入其他文件)。
在理想情况下,我有一个生成器一次读取一个文件,并且我想将此生成器传递给一组工作线程,以便它们可以处理由生成器生成的项目。
这里是一个示例:
def process_file(file_string):
...
return processed_file
pool = Pool(processes=4)
path = 'some/path/'
results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))
上述代码唯一的问题是在池开始之前所有文件都被读入内存,这意味着我需要等待磁盘将所有内容读入内存,并且会消耗大量的内存。
pool.map()
调用中指定一个chunksize
参数,以控制在每次提交可迭代对象的多少个元素作为独立任务提交到Pool
。 - martineau