dask DataFrame.assign崩溃了dask图

3

我在使用dask DataFrame.append时遇到了问题。我从主数据生成了许多派生特征,并将它们附加到主数据框中。之后,任何一组列的dask图都会崩溃。下面是一个小例子:

%pylab inline
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.dot import dot_graph

df=pd.DataFrame({'x%s'%i:np.random.rand(20) for i in range(5)})
ddf = dd.from_pandas(df, npartitions=2)

dot_graph(ddf['x0'].dask)

这里是预期的Dask图

g=ddf.assign(y=ddf['x0']+ddf['x1'])
dot_graph(g['x0'].dask)

这里的图表是通过无关计算扩大了同一列的内容。

想象一下,我有很多很多生成的列。因此,任何特定列的计算图包括所有其他列的无关计算。即在我的情况下,len(ddf ['someColumn'].dask) > 100000。因此,它很快变得不可用。

那么我的问题是,这个问题可以解决吗?是否有任何现有的方法可以解决这个问题?如果没有 - 我应该寻找什么方向来实现这一点?

谢谢!

1个回答

6

与其不断地给Dask数据框分配新列,您可能希望构建多个Dask系列,然后在最后将它们全部连接起来。

因此,不要这样做:

df['x'] = df.w + 1
df['y'] = df.x * 10
df['z'] = df.y ** 2

请执行此操作

x = df.w + 1
y = x + 10
z = y * 2
df = df.assign(x=x, y=y, z=z)

或者这样:
dd.concat([df, x, y, z], axis=1)

这可能仍然会导致图表中的任务数量相同,但可能会减少内存复制次数。另外,如果您所有的转换都是逐行进行的,则可以构建一个Pandas函数,并将其映射到所有分区。
def f(part):
    part = part.copy()
    part['x'] = part.w + 1
    part['y'] = part.x * 10
    part['z'] = part.y ** 2
    return part

df = df.map_partitions(f)

此外,虽然一个拥有一百万节点的任务图并不理想,但也是可以接受的。我见过更大的图形能够轻松运行。

感谢您的快速回复。目前我正在使用df = df.assign(x=x, y=y, z=z)一次性分配所有系列,但是对于不需要这些计算的列,图中会有额外的节点。是否有一种优化图表的方法,使得当您获取列的子集时,它会裁剪所有其他计算?我的解决方法是计算所有内容,保存到hdf,加载它,然后我可以按需访问某些列的子集。但是然后我遇到了hdf列长度限制,并再次分块数据,再次选择列的子集。 - oxymoron
2
如果你只使用了 xy 而没有使用 z,那么 Dask 将从计算中删除 z。但是一旦你将它们全部分配到 dataframe 中,你就必须全部使用它们。Dask 不执行像你想要的高级优化。您可以尝试将其持久化为 Parquet 而不是 HDF5。读取几列也比 HDF5 快得多。 - MRocklin
谢谢,我会尝试使用Parquet。 - oxymoron

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接