我有一个25Gb的纯文本文件,大约有1000万行,每行有几百个单词。每一行都需要单独处理,我正在尝试将它们分成若干块交给十几个工作进程并行处理。目前的做法是每次读入100万行数据(即使这些数据在磁盘上未压缩时只有3GB,但内存占用却达到了10GB),将其平均分成12份,然后使用multiprocessing.Pool将其映射到12个工作进程上。
问题是,当我的12个工作进程完成其分配的数据处理后,它们的RAM并没有被释放,而是在下一轮100万行迭代中再增加约10GB。
我已经尝试了使用“del”删除先前的数据,将先前的数据重置为空分配,使用eval()创建可迭代变量名称,在删除后使用gc.collect(),以及将IO完全分离到自己的函数中等方法,但都没有成功解决问题。运行调试显示,Python解释器仅识别预期的数据,无法访问先前迭代的数据,那么为什么内存不会被真正释放呢?
下面的代码是我最新尝试将所有环境分离的版本,虽然这不是最有效的方法,但由于“BigFileOnDisk”在SSD上,每次重新读取文件的时间可以忽略不计,相比实际处理数据来说,这个时间是微不足道的。之前的做法是在分配函数中包含“read”功能,在工作进程完成后删除所有数据,但结果却相同。
问题是,当我的12个工作进程完成其分配的数据处理后,它们的RAM并没有被释放,而是在下一轮100万行迭代中再增加约10GB。
我已经尝试了使用“del”删除先前的数据,将先前的数据重置为空分配,使用eval()创建可迭代变量名称,在删除后使用gc.collect(),以及将IO完全分离到自己的函数中等方法,但都没有成功解决问题。运行调试显示,Python解释器仅识别预期的数据,无法访问先前迭代的数据,那么为什么内存不会被真正释放呢?
下面的代码是我最新尝试将所有环境分离的版本,虽然这不是最有效的方法,但由于“BigFileOnDisk”在SSD上,每次重新读取文件的时间可以忽略不计,相比实际处理数据来说,这个时间是微不足道的。之前的做法是在分配函数中包含“read”功能,在工作进程完成后删除所有数据,但结果却相同。
def allocation():
fileCompleted = False
currentLine = 0
while not fileCompleted:
lineData, currentLine, fileCompleted = read(numLines=1000000, startLine=currentLine)
list_of_values(function_object=worker, inputs=lineData, workers=12)
def read(numLines, startLine=0):
currentLine = 0
lines = []
with open(BigFileOnDisk, 'r') as fid:
for line in fid:
if currentLine >= startLine:
lines.append(line)
if currentLine - startLine >= numLines:
return lines, counter, False
currentLine += 1
# or if we've hit the end of the file
return lines, counter, True
def worker(lines):
outputPath = *root* + str(datetime.datetime.now().time())
processedData = {}
for line in lines:
# process data
del lines
with open(outputPath, 'a') as fid:
for item in processedData:
fid.write(str(item) + ', ' + str(processedData[item]) + '\n')
def list_of_values(function_object, inputs, workers = 10):
inputs_split = []
subsection_start = 0
for n in range(workers):
start = int(subsection_start)
end = int(subsection_start + len(inputs) / workers)
subsection_start = end
inputs_split.append( inputs[start:end] )
p = Pool(workers)
p.map(function_object, inputs_split)
allocation
循环的末尾添加del linedata
,并在list_of_values
的末尾添加del inputs
。 - Broseph