将Dask Pandas DataFrames的Bag转换为单个Dask DataFrame

3

问题概述

简短版

如何将Dask Bag of Pandas DataFrames转换为单个Dask DataFrame?

详细版

我有一些文件,无法通过任何dask.dataframe的各种read函数(例如dd.read_csvdd.read_parquet)进行读取。我有自己的函数,可以将它们作为Pandas DataFrames读入(该函数仅在一次处理一个文件时起作用,类似于pd.read_csv)。我希望将所有这些单个的Pandas DataFrames合并为一个大的Dask DataFrame。

最小工作示例

以下是一些示例CSV数据(我的实际数据不是在CSV中,但在此处使用它以便于演示)。要创建最小的工作示例,您可以将其保存为CSV并制作几个副本,然后使用下面的代码

"gender","race/ethnicity","parental level of education","lunch","test preparation course","math score","reading score","writing score"
"female","group B","bachelor's degree","standard","none","72","72","74"
"female","group C","some college","standard","completed","69","90","88"
"female","group B","master's degree","standard","none","90","95","93"
"male","group A","associate's degree","free/reduced","none","47","57","44"
"male","group C","some college","standard","none","76","78","75"

from glob import glob
import pandas as pd
import dask.bag as db

files = glob('/path/to/your/csvs/*.csv')
bag = db.from_sequence(files).map(pd.read_csv)

到目前为止,我尝试过的方法:

import pandas as pd
import dask.bag as db
import dask.dataframe as dd

# Create a Dask bag of pandas dataframes
bag = db.from_sequence(list_of_files).map(my_reader_function)

df = bag.map(lambda x: x.to_records()).to_dataframe() # this doesn't work
df = bag.map(lambda x: x.to_dict(orient = <any option>)).to_dataframe() # neither does this

# This gets me really close. It's a bag of Dask DataFrames. 
# But I can't figure out how to concatenate them together
df = bag.map(dd.from_pandas, npartitions = 1)

df = dd.from_delayed(bag) # returns an error

您提供的示例数据可以轻松转换为pandas数据框,并保存为不同的文件,然后可以将它们读入单个dask数据框中。因此,不清楚为什么需要使用dask bag并将其转换为dask数据框。 - KRKirov
你问了一个好问题。我应该澄清一下:我有几百万个这些文件需要读取,每个文件的大小从几KB到许多MB不等。因此,我没有空间创建可由dask读取的文件副本。 - natemcintosh
如果您的自定义读取函数输出一个单独的pandas数据框,那么您不能将输出转换为dask数据框,并继续这样做并附加到dask数据框吗? - KRKirov
这确实非常接近我想要的!但是,我还没有找到一种不需要同时将所有文件读入内存的方法来实现这一点。bag = db.from_sequence(files).map(pd.read_csv).map(dd.from_pandas, npartitions=1) 然后 dd.concat(bag.compute()) 确实可以工作;但它会将所有内容都读入内存,这对于数百万个文件是行不通的。 - natemcintosh
3个回答

3

这正是我一直在寻找的!真不敢相信我之前居然没看到过这个页面。谢谢! - natemcintosh

1
这里有两个可能的解决方案:
1. 将包转换为数据帧列表,然后使用 dd.multi.concat
bag #a dask bag of dataframes
list_of_dfs = bag.compute()
df = dd.multi.concat(list_of_dfs).compute()

2. 转换为字典集合,并使用 bag.to_dataframe:

bag_of_dicts = bag.map(lambda df: df.to_dict(orient='records')).flatten()
df = bag_of_dicts.to_dataframe().compute()

在我的具体应用场景中,选项#2的性能比选项#1更好。

0
如果您已经有了一个数据框的集合,那么可以按照以下步骤进行操作:
  1. 将集合转换为延迟分区
  2. 通过连接延迟分区将其转换为数据框的延迟对象
  3. 使用这些延迟对象创建数据框
Python 代码如下:
def bag_to_dataframe(bag, **concat_kwargs):
    partitions = bag.to_delayed()
    dataframes = map(
        dask.delayed(lambda partition: pandas.concat(partition, **concat_kwargs)),
        partitions
    )
    return dask.dataframe.from_delayed(dataframes)

你可能想要控制分区的连接方式,例如忽略索引。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接