Dask数据框:读取多个文件并将文件名存储在列中

7
我经常使用 dask.dataframe 来读取多个文件,如下所示:
import dask.dataframe as dd

df = dd.read_csv('*.csv')

然而,每行数据的来源,即从哪个文件中读取的数据看起来已经永久丢失。
有没有一种方法可以将其作为列添加进去,例如,如果 file1.csv 是第一个包含 100 行的文件,则 df.loc[:100, 'partition'] = 'file1.csv'。当触发工作流程中的 compute 时,将应用于读入数据帧的每个“partition”/文件。
这样做的想法是可以根据来源应用不同的逻辑。

3
你可能需要查看这个例子,使用dask.delayed构建自定义读取器:https://gist.github.com/mrocklin/e7b7b3a65f2835cda813096332ec73ca - MRocklin
2个回答

8

Dask函数read_csvread_tableread_fwf现在都包含一个参数include_path_column

include_path_column:bool or str, optional
Whether or not to include the path to each particular file.
If True a new column is added to the dataframe called path.
If str, sets new column name. Default is False.

4
假设您有一个名为file_list的列表,其中包含每个CSV文件的文件路径,并且每个单独的文件都适合在RAM中(您提到了100行),那么这应该可以工作:
import pandas as pd
import dask.dataframe as dd
from dask import delayed

def read_and_label_csv(filename):
    # reads each csv file to a pandas.DataFrame
    df_csv = pd.read_csv(filename)
    df_csv['partition'] = filename.split('\\')[-1]
    return df_csv

# create a list of functions ready to return a pandas.DataFrame
dfs = [delayed(read_and_label_csv)(fname) for fname in file_list]
# using delayed, assemble the pandas.DataFrames into a dask.DataFrame
ddf = dd.from_delayed(dfs)

当然需要进行一些定制。如果你的csv文件超出了RAM大小,那么使用dask.DataFrame的连接可能是最好的选择。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接