Python在并行处理大文件时不释放内存。

4
我有一个25Gb的纯文本文件,大约有1000万行,每行有几百个单词。每一行都需要单独处理,我正在尝试将它们分成若干块交给十几个工作进程并行处理。目前的做法是每次读入100万行数据(即使这些数据在磁盘上未压缩时只有3GB,但内存占用却达到了10GB),将其平均分成12份,然后使用multiprocessing.Pool将其映射到12个工作进程上。
问题是,当我的12个工作进程完成其分配的数据处理后,它们的RAM并没有被释放,而是在下一轮100万行迭代中再增加约10GB。
我已经尝试了使用“del”删除先前的数据,将先前的数据重置为空分配,使用eval()创建可迭代变量名称,在删除后使用gc.collect(),以及将IO完全分离到自己的函数中等方法,但都没有成功解决问题。运行调试显示,Python解释器仅识别预期的数据,无法访问先前迭代的数据,那么为什么内存不会被真正释放呢?
下面的代码是我最新尝试将所有环境分离的版本,虽然这不是最有效的方法,但由于“BigFileOnDisk”在SSD上,每次重新读取文件的时间可以忽略不计,相比实际处理数据来说,这个时间是微不足道的。之前的做法是在分配函数中包含“read”功能,在工作进程完成后删除所有数据,但结果却相同。
def allocation():
    fileCompleted = False
    currentLine = 0
    while not fileCompleted:
        lineData, currentLine, fileCompleted = read(numLines=1000000, startLine=currentLine)
        list_of_values(function_object=worker, inputs=lineData, workers=12)


def read(numLines, startLine=0):
    currentLine = 0
    lines = []
    with open(BigFileOnDisk, 'r') as fid:
        for line in fid:
            if currentLine >= startLine:
                lines.append(line)
            if currentLine - startLine >= numLines:
                return lines, counter, False
            currentLine += 1
        # or if we've hit the end of the file
        return lines, counter, True


def worker(lines):
    outputPath = *root* + str(datetime.datetime.now().time())
    processedData = {}

    for line in lines:
        # process data

    del lines
    with open(outputPath, 'a') as fid:
        for item in processedData:
            fid.write(str(item) + ', ' + str(processedData[item]) + '\n')


def list_of_values(function_object, inputs, workers = 10):
    inputs_split = []
    subsection_start = 0
    for n in range(workers):
        start = int(subsection_start)
        end = int(subsection_start + len(inputs) / workers)
        subsection_start = end

        inputs_split.append( inputs[start:end] )

    p = Pool(workers)
    p.map(function_object, inputs_split)

我们需要看到你的代码! - kirbyfan64sos
最好提供一个 [mcve]。 - jpmc26
代码已发布 - MKennedy
尝试在allocation循环的末尾添加del linedata,并在list_of_values的末尾添加del inputs - Broseph
请参考这个答案。同时也可以查看这篇文章 - Dan
显示剩余4条评论
1个回答

4

您没有加入子进程。在list_of_values完成后,由Pool创建的进程仍然存在(有点像僵尸,但父进程是活着的)。它们仍然持有它们所有的值。您无法在主进程中看到它们的数据,因为它在另一个进程中(出于同样的原因,gc.collect不起作用)。

要释放工作进程分配的内存,您需要手动加入Pool或使用with

def list_of_values(function_object, inputs, workers = 10):
    inputs_split = []
    subsection_start = 0
    for n in range(workers):
        start = int(subsection_start)
        end = int(subsection_start + len(inputs) / workers)
        subsection_start = end

        inputs_split.append( inputs[start:end] )

    with Pool(workers) as p:
        p.map(function_object, inputs_split)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接