使用 dask.delayed 和 pandas.DataFrame 将 Dask 词袋(dask.bag)字典转换为 Dask 数据帧(dask.dataframe)。

5

我正在努力将一个字典的dask.bag转换为dask.delayedpandas.DataFrames,最终形成一个dask.dataframe

我有一个函数(make_dict),将文件读入到一个相当复杂的嵌套字典结构中,还有另一个函数(make_df),将这些字典转换为一个pandas.DataFrame(每个文件的结果数据框大小约为100MB)。我希望将所有数据框添加到一个单独的dask.dataframe中以进行进一步分析。

到目前为止,我一直在使用dask.delayed对象来加载、转换和追加所有数据,这是有效的(请参见下面的示例)。然而,出于未来的工作考虑,我希望使用dask.persist()将已加载的字典存储在dask.bag中。

我设法将数据加载到dask.bag中,产生了一个字典列表或pandas.DataFrame列表,在调用compute()之后,我可以在本地使用它们。但是,当我尝试使用to_delayed()dask.bag转换为dask.dataframe时,我卡住了(请参见下面的错误)。

感觉我在这里缺少了一些非常简单的东西,或者我的dask.bag方法是错误的?

下面的示例显示了使用简化函数的方法,并抛出了相同的错误。任何有关如何解决这个问题的建议都将不胜感激。

import numpy as np
import pandas as pd
import dask
import dask.dataframe
import dask.bag

print(dask.__version__) # 1.1.4
print(pd.__version__) # 0.24.2

def make_dict(n=1):
    return {"name":"dictionary","data":{'A':np.arange(n),'B':np.arange(n)}}

def make_df(d):
    return pd.DataFrame(d['data'])

k = [1,2,3]

# using dask.delayed
dfs = []
for n in k:
    delayed_1 = dask.delayed(make_dict)(n)
    delayed_2 = dask.delayed(make_df)(delayed_1)
    dfs.append(delayed_2)
ddf1 = dask.dataframe.from_delayed(dfs).compute() # this works as expected

# using dask.bag and turning bag of dicts into bag of DataFrames
b1 = dask.bag.from_sequence(k).map(make_dict)
b2 = b1.map(make_df)

df = pd.DataFrame().append(b2.compute()) # <- I would like to do this using delayed dask.DataFrames like above
ddf2 = dask.dataframe.from_delayed(b2.to_delayed()).compute() # <- this fails

# error:
# ValueError: Expected iterable of tuples of (name, dtype), got [   A  B
# 0  0  0]

我最终想要使用分布式调度程序实现的目标:

b = dask.bag.from_sequence(k).map(make_dict)
b = b.persist()
ddf = dask.dataframe.from_delayed(b.map(make_df).to_delayed())
1个回答

2
在袋子的情况下,延迟对象指向元素列表,因此您拥有一系列Pandas数据帧的列表,这不是您想要的。两个建议:
  1. 坚持使用dask.delayed。它似乎对您很有效。
  2. 使用Bag.to_dataframe方法,该方法期望一个字典包,可以自行进行数据帧转换。
"Original Answer"翻译成"最初的回答"

谢谢,我选择了选项1,并让延迟字典在分布式系统上持久化,这基本上给了我想要的结果(我认为使用bag可能更容易,但是在这里延迟服务也同样好)。不过我也会研究一下bag.to_dataframe,感谢您的建议。 - CFabry
当您拥有Pandas数据框的列表列表时,我们是否可以自己将列表展平,然后将其传递给dask.dataframe.from_delayed? - Jenna Kwon
是的。只要您提供一个包含Pandas数据框延迟对象列表,Dask就不会关心您如何使用Python创建该列表。 - MRocklin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接