Dask数据框根据列或函数拆分分区

15

我最近开始了解Dask大数据处理。关于在并行应用操作的效率问题,我有一个疑问。

比如我有一些销售数据,就像这样:

customerKey    productKey    transactionKey    grossSales  netSales      unitVolume    volume transactionDate
-----------  --------------  ----------------  ----------  --------      ----------    ------ --------------------
    20353           189            219548     0.921058     0.921058              1         1  2017-02-01 00:00:00
  2596618           189            215015     0.709997     0.709997              1         1  2017-02-01 00:00:00
 30339435           189            215184     0.918068     0.918068              1         1  2017-02-01 00:00:00
 32714675           189            216656     0.751007     0.751007              1         1  2017-02-01 00:00:00
 39232537           189            218180     0.752392     0.752392              1         1  2017-02-01 00:00:00
 41722826           189            216806     0.0160143    0.0160143             1         1  2017-02-01 00:00:00
 46525123           189            219875     0.469437     0.469437              1         1  2017-02-01 00:00:00
 51024667           189            215457     0.244886     0.244886              1         1  2017-02-01 00:00:00
 52949803           189            215413     0.837739     0.837739              1         1  2017-02-01 00:00:00
56526281 189 220261 0.464716 0.464716 1 1 2017-02-01 00:00:00 56776211 189 220017 0.272027 0.272027 1 1 2017-02-01 00:00:00 58198475 189 215058 0.805758 0.805758 1 1 2017-02-01 00:00:00 63523098 189 214821 0.479798 0.479798 1 1 2017-02-01 00:00:00 65987889 189 217484 0.122769 0.122769 1 1 2017-02-01 00:00:00 74607556 189 220286 0.564133 0.564133 1 1 2017-02-01 00:00:00 75533379 189 217880 0.164387 0.164387 1 1 2017-02-01 00:00:00 85676779 189 215150 0.0180961 0.0180961 1 1 2017-02-01 00:00:00 88072944 189 219071 0.492753 0.492753 1 1 2017-02-01 00:00:00 90233554 189 216118 0.439582 0.439582 1 1 2017-02-01 00:00:00 91949008 189 220178 0.1893 0.1893 1 1 2017-02-01 00:00:00我想进行几个不同的分组操作,首先是以customerKey为基础的groupby-apply。 然后是在customerkey上的groupby-sum,其中一列将是前面groupby-apply的结果。
我能想到的最有效的方法是将此数据框拆分为按客户密钥的块划分的分区。例如,我可以使用以下分区方案将数据框拆分为4个块(伪代码):
按customerKey%4分区
然后我可以使用map_partitions为每个分区执行这些groupby-apply,最后返回结果。但是似乎Dask强制我为要执行的每个groupby都做一次shuffle。
有没有办法根据列的值重新分区?
目前,对于仅有约80,000行的数据框,使用4个worker需要约45秒。我计划扩展到数万亿行的数据框,而且已经看起来会非常缓慢。
我是否忽略了Dask的基本知识?
2个回答

10
你可以将你的列设置为索引。
df = df.set_index('customerKey')

这将按该列排序您的数据并跟踪哪些值范围在哪个分区中。正如您所指出的,这可能是一项昂贵的操作,因此您可能希望将其保存在某个地方。

可以保存在内存中。

df = df.persist()

或者在磁盘上

df.to_parquet('...')
df = df.read_parquet('...')

4
啊哈,所以如果你将索引设置为customerKey,你就可以保证在每个分区中有独立的customerKey块。很棒,感谢您提供这个信息。当使用Dask dataframe时,将索引设置为一个列,然后执行df = df.map_partitions(f).compute()是否是一种标准做法?虽然文档中没有详细提到,但它似乎是我尝试做的最有效的解决方案。 - Roger Thomas
1
正确的,请参见http://dask.pydata.org/en/latest/dataframe-design.html#partitions获取更多信息。 - MRocklin
1
有点常见。你也可以使用groupby-apply,但考虑到你上面表述问题的方式,我认为set_index/map_partitions解决方案对你来说更自然。 - MRocklin
6
我已尝试过两种方法,groupby-apply 所需时间大约比 map_partitions 多十倍。 - Roger Thomas
这将导致重复索引,这不是一个问题吗? - Itamar Mushkin
显示剩余2条评论

-1

将索引设置为所需列并使用map_partitions比groupby更高效


6
虽然这可能对楼主有所帮助,但最好添加更多的细节、例子等。请提供不需要询问者澄清的答案。 - Til
1
@43shahin,我也想了解更多细节。 - scottlittle

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接