我正在使用dask构建一个非常大的DAG,以提交给分布式调度程序,在其中节点操作数据帧,这些数据帧本身可能相当大。一种模式是,我有大约50-60个函数,它们加载数据并构造pandas数据帧,每个数据帧都有几百MB(并且在逻辑上代表单个表的分区)。我想将它们连接成一个单独的dask数据帧,以便下游节点在图中使用,同时最小化数据移动。我像这样链接任务:
dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs]
dfs = [dask.delayed(pandas_to_dask)(df) for df in dfs]
return dask.delayed(concat_all)(dfs)
在哪里
def pandas_to_dask(df):
return dask.dataframe.from_pandas(df).to_delayed()
我曾尝试过各种concat_all
实现方法,但这个看起来很合理:
def concat_all(dfs):
dfs = [dask.dataframe.from_delayed(df) for df in dfs]
return dask.dataframe.multi.concat(dfs, axis='index', join='inner')
所有的pandas数据帧在其索引上都是不相交的,并且已排序/单调。
然而,我在这个concat_all
函数上遇到了问题(集群管理器因超出内存预算而杀死它们),尽管每个节点的内存预算实际上相当大,我不希望它移动数据。我相当确定,在使用dask数据框的图节点内调用compute()
之前,我总是对合理的数据子集进行切片。
我正在尝试使用--memory-limit
,但目前还没有成功。我至少正确地解决了问题吗?我是否有所遗漏?
dfs[0]
以提取元数据。由于某种原因,这对我造成了问题。是否有另一种方法可以推迟此评估,直到图形完全构建?我将尝试组合一个可重现的实例。 - Adam Kleinmeta=
关键字中提供一个示例数据框。我会在答案中添加一个示例。 - MRocklin