如何在Python中处理13GB大小的大文件而不崩溃?

4

我需要在一个服务器上(不是我的电脑)处理这个非常大的文件。该服务器运行Python 64位并具有24GB的内存。该文件本身大小约为13GB,包含2700万行数据。考虑到服务器规格相当大,我尝试将整个文件加载到pandas中,但它崩溃了。我尝试使用dask,但速度仍然非常慢。因此,我将文件分成以下块。

我的代码与下面类似。我将文件加载到每个100,000行数据的块中。然后它会处理每个块,并将其附加到现有文件中。我认为通过分块处理,它不会将数据存储在RAM中,但我认为它仍然会这样做。前几百次迭代很好用,但在处理了8GB数据后,它就崩溃了。

chunksize= 100000
c = 0
for chunk in pd.read_csv(fname, chunksize=chunksize,sep='|',error_bad_lines=False):

    chunk['col1'] = chunk['col1'].apply(process1)
    chunk['col2'] = chunk['col2'].apply(process2)

    if c == 0:
        chunk.to_csv("result/result.csv", index=False)
    else:
        chunk.to_csv('result/result.csv', mode='a', header=False, index=False)

    if c%10==0:
        print(c)
        
    c+=1

通常在经过160次迭代、生成8GB的result.csv文件后,程序会因为MemoryError而停止运行。
说实话,我自己也不能访问这个服务器上的很多东西,所以如果你想建议更改一些超出我的权限范围的设置,那我可能无法满足。但还是非常感谢您的帮助。
编辑: 我会在这里添加process1和process2正在进行的内容。
def process1(name):
    if type(name)==str:
        new_name = name[:3]+'*' * len(name[:-3])
    else:
        return name
    
    return new_name

def process2(number):
    if number !=np.nan:
        new_number = str(number)
        new_number = '*'*len(new_number)
        return new_number
    else:
        return number

2020年的今天,13GB已经是一个相对较小的文件了... - ℕʘʘḆḽḘ
process1process2是什么?如果其中一个函数效率低下,那可能就是问题所在。 - David Erickson
@DavidErickson 我刚刚添加了更多细节。 - catris25
1
在编写CSV后,加上del chunk。在for循环中,创建一个新的数据框并将其分配给chunk。在此期间,旧的和新的块同时存在于内存中。在块的末尾执行del操作,就不会有重叠了。 - tdelaney
程序的内存使用是否会一直增加直到崩溃?如果不是,可能是块大小的问题。每次处理50000个项目或更少可能是一个选择。我认为这对处理时间没有太大影响。你甚至可以回到“csv”模块并逐行进行转换。 - tdelaney
显示剩余11条评论
1个回答

3
一个 for 循环的一般语法如下:
for target in expression:
    do all the things

Python会将表达式计算为一个对象,只有在完成后才将该对象分配给目标变量。这意味着任何已经存在于target中的对象在其替换构建完成之前都不会被删除。

除非正在创建的对象很大,否则这并不是什么大问题。但此处情况不同,即将被删除的块在创建新块时仍存储在内存中,实际上使内存占用量翻倍。解决方法是在循环中手动删除目标变量,然后再返回获取更多内容。

for chunk in pd.read_csv(fname, chunksize=chunksize,sep='|',error_bad_lines=False):

    chunk['col1'] = chunk['col1'].apply(process1)
    chunk['col2'] = chunk['col2'].apply(process2)

    if c == 0:
        chunk.to_csv("result/result.csv", index=False)
    else:
        chunk.to_csv('result/result.csv', mode='a', header=False, index=False)
    del chunk # destroy dataframe before next loop to conserve memory.    
    if c%10==0:
        print(c)
    c+=1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接