Dask图执行和内存使用

4

我正在使用dask构建一个非常大的DAG,以提交给分布式调度程序,在其中节点操作数据帧,这些数据帧本身可能相当大。一种模式是,我有大约50-60个函数,它们加载数据并构造pandas数据帧,每个数据帧都有几百MB(并且在逻辑上代表单个表的分区)。我想将它们连接成一个单独的dask数据帧,以便下游节点在图中使用,同时最小化数据移动。我像这样链接任务:

dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs]
dfs = [dask.delayed(pandas_to_dask)(df) for df in dfs]
return dask.delayed(concat_all)(dfs)

在哪里

def pandas_to_dask(df):
    return dask.dataframe.from_pandas(df).to_delayed()

我曾尝试过各种concat_all实现方法,但这个看起来很合理:

def concat_all(dfs):
    dfs = [dask.dataframe.from_delayed(df) for df in dfs]
    return dask.dataframe.multi.concat(dfs, axis='index', join='inner')

所有的pandas数据帧在其索引上都是不相交的,并且已排序/单调。

然而,我在这个concat_all函数上遇到了问题(集群管理器因超出内存预算而杀死它们),尽管每个节点的内存预算实际上相当大,我不希望它移动数据。我相当确定,在使用dask数据框的图节点内调用compute()之前,我总是对合理的数据子集进行切片。

我正在尝试使用--memory-limit,但目前还没有成功。我至少正确地解决了问题吗?我是否有所遗漏?

1个回答

5

给定延迟值的列表,这些值计算为pandas数据帧

>>> dfs = [dask.delayed(load_pandas)(i) for i in disjoint_set_of_dfs]
>>> type(dfs[0].compute())  # just checking that this is true
pandas.DataFrame

将它们传递给dask.dataframe.from_delayed函数。
>>> ddf = dd.from_delayed(dfs)

默认情况下,这将运行第一个计算以确定元数据(列名、数据类型等对于dask.dataframe很重要)。您可以通过构建示例数据帧并将其传递给meta=关键字来避免这种情况。

>>> meta = pd.DataFrame({'value': [1.0], 'name': ['foo'], 'id': [0]})
>>> ddf = dd.from_delayed(dfs, meta=meta)

这个示例笔记本可能也会有所帮助。

通常情况下,您不需要从其他dask函数中调用dask函数(就像您通过延迟from_pandas调用那样)。Dask.dataframe函数本身已经具有惰性,不需要进一步地延迟。


感谢您的快速回复。我注意到dd.from_delayed(dfs)会立即评估dfs[0]以提取元数据。由于某种原因,这对我造成了问题。是否有另一种方法可以推迟此评估,直到图形完全构建?我将尝试组合一个可重现的实例。 - Adam Klein
您可以在meta=关键字中提供一个示例数据框。我会在答案中添加一个示例。 - MRocklin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接