如何在Python中合并大型CSV文件?

9
我有18个csv文件,每个大约1.6GB,每个文件包含大约1200万行数据。每个文件代表一年的数据。我需要合并所有这些文件,提取特定地理位置的数据,然后分析时间序列。如何最好地做到这一点?
我尝试使用pd.read_csv但是我遇到了内存限制。我已经尝试包括chunk size参数,但这给了我一个TextFileReader对象,我不知道如何将它们组合成一个dataframe。我还尝试过pd.concat,但这也不起作用。

需要使用pandas吗?所有文件的csv数据格式都相同吗?如果是的话,您可以尝试逐行读取/写入源/目标文件,避免内存问题。 - martyn
1
你可以尝试使用 dask,因为它更适合在内存中管理如此大的文件。 - AlCorreia
可能是读取大型.csv文件的重复问题。 - PV8
有关这个主题有几个讨论:https://dev59.com/3GQm5IYBdhLWcg3wowWw - PV8
@martyn 不一定非得用pandas,但作为初学者我不知道还能用什么。 - ChrisB
3个回答

22

以下是使用 pandas 组合大型 csv 文件的优雅方法。该技术是每次迭代将定义为 CHUNK_SIZE 的行数载入内存,直到完成。这些行将以“附加”模式附加到输出文件中。

import pandas as pd

CHUNK_SIZE = 50000
csv_file_list = ["file1.csv", "file2.csv", "file3.csv"]
output_file = "./result_merge/output.csv"

for csv_file_name in csv_file_list:
    chunk_container = pd.read_csv(csv_file_name, chunksize=CHUNK_SIZE)
    for chunk in chunk_container:
        chunk.to_csv(output_file, mode="a", index=False)

但是,如果你的文件包含头部信息,那么在接下来的文件中跳过头部信息是有道理的,除了第一个文件外。重复的头部信息是不可预期的。在这种情况下,可以采用以下解决方案:

import pandas as pd

CHUNK_SIZE = 50000
csv_file_list = ["file1.csv", "file2.csv", "file3.csv"]
output_file = "./result_merge/output.csv"

first_one = True
for csv_file_name in csv_file_list:

    if not first_one: # if it is not the first csv file then skip the header row (row 0) of that file
        skip_row = [0]
    else:
        skip_row = []

    chunk_container = pd.read_csv(csv_file_name, chunksize=CHUNK_SIZE, skiprows = skip_row)
    for chunk in chunk_container:
        chunk.to_csv(output_file, mode="a", index=False)
    first_one = False

2
你应该在to_csv()中添加header=False,否则每次写入块时都会被写入一个标题。在我的情况下,我的输入数据没有标题,所以read_csv()将第一行解释为标题,而to_csv()在写入每个块时插入了第一行。如果你需要从输入文件中获取第一行,则在read_csv()中添加header=None。 - codingmonkey87

3

内存限制被触及是因为您尝试将整个CSV文件加载到内存中。一个简单的解决方案是逐行读取文件(假设您的所有文件具有相同的结构),控制其内容,然后将其写入目标文件:

filenames = ["file1.csv", "file2.csv", "file3.csv"]
sep = ";"

def check_data(data):
    # ... your tests
    return True # << True if data should be written into target file, else False

with open("/path/to/dir/result.csv", "a+") as targetfile:
    for filename in filenames :
        with open("/path/to/dir/"+filename, "r") as f:
            next(f) # << only if the first line contains headers
            for line in f:
                data = line.split(sep)
                if check_data(data):
                    targetfile.write(line)
更新:根据您的评论,以下是check_data方法的示例:
def check_data(data):
    return data[n] == 'USA' # < where n is the column holding the country

1
请注意,如果您的分隔符字符也出现在字段内部,则此操作将失败/表现异常。在这种情况下,您可能需要更复杂的解析行数据的方法。 - Tom Dalton
这样会创建一个我想要的数据的 CSV 文件,然后我可以重新导入并从中进行分析吗? - ChrisB
不,这将逐行读取您的所有csv文件,并仅在通过“check_data”方法时将每行写入目标文件。(在使用此解决方案时未受到任何内存损坏) - olinox14
如果在 check_data 函数中,我想仅针对每个文件中“Country”列中包含“USA”的行进行操作,应该如何编写代码?抱歉问题比较简单。 - ChrisB

1
您可以使用 pd.DataFrameTextFileReader 对象转换为数据框,方法如下:df = pd.DataFrame(chunk),其中 chunk 的类型是 TextFileReader。然后,您可以使用 pd.concat 来连接各个数据帧。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接