使用Pandas的多进程读取csv文件的最简单方法

18

这是我的问题。
有许多 .csv 文件(或其他文件),Pandas 是一种将它们读取并保存为 Dataframe 格式的简单方法。但当文件数量很多时,我想使用多进程来读取文件以节省时间。

我的早期尝试

我手动将文件分成不同的路径。分别使用:

os.chdir("./task_1")
files = os.listdir('.')
files.sort()
for file in files:
    filename,extname = os.path.splitext(file)
    if extname == '.csv':
        f = pd.read_csv(file)
        df = (f.VALUE.as_matrix()).reshape(75,90)   

然后将它们组合在一起。

如何使用 pool 运行它们以解决我的问题?
任何建议将不胜感激!


1
如果你还没有看过的话,这篇文章值得一读:https://dev59.com/cmYq5IYBdhLWcg3wtCzO 或者 https://www.reddit.com/r/Python/comments/3been9/pandas_speed_up_read_csv_with_multiprocessing/ - Alexander
4个回答

35

使用Pool

import os
import pandas as pd 
from multiprocessing import Pool

# wrap your csv importer in a function that can be mapped
def read_csv(filename):
    'converts a filename to a pandas dataframe'
    return pd.read_csv(filename)


def main():

    # get a list of file names
    files = os.listdir('.')
    file_list = [filename for filename in files if filename.split('.')[1]=='csv']

    # set up your pool
    with Pool(processes=8) as pool: # or whatever your hardware can support

        # have your pool map the file names to dataframes
        df_list = pool.map(read_csv, file_list)

        # reduce the list of dataframes to a single dataframe
        combined_df = pd.concat(df_list, ignore_index=True)

if __name__ == '__main__':
    main()

df_list 包含什么内容? - seralouk
@serafeim df_list 是由进程池产生的 pd.DataFrame 列表。 - zemekeneng
这段代码会不会有任何情况不能正常工作?我曾尝试读取 Excel 文件,但有时会卡住。 - Ipa

7

dask库的设计不仅可以解决您的问题,当然也包括其他问题。


2
我无法使用 map/map_async,但已成功使用 apply_async。有两种可能的方法(我不知道哪一种更好):A) 在结尾处连接;B) 过程中连接。我发现使用 glob 可以轻松地从目录中列出并筛选文件。
from glob import glob
import pandas as pd
from multiprocessing import Pool

folder = "./task_1/" # note the "/" at the end
file_list = glob(folder+'*.xlsx')

def my_read(filename):
    f = pd.read_csv(filename)
    return (f.VALUE.as_matrix()).reshape(75,90)

#DF_LIST = [] # A) end
DF = pd.DataFrame() # B) during

def DF_LIST_append(result):
    #DF_LIST.append(result) # A) end
    global DF # B) during
    DF = pd.concat([DF,result], ignore_index=True) # B) during

pool = Pool(processes=8)

for file in file_list:
    pool.apply_async(my_read, args = (file,), callback = DF_LIST_append)

pool.close()
pool.join()

#DF = pd.concat(DF_LIST, ignore_index=True) # A) end

print(DF.shape)

1
如果您不反对使用其他库,可以使用Graphlab的sframe。这将创建一个类似于数据框架的对象,如果性能是一个大问题,非常快速地读取数据。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接