Dask数据框架:`set_index`能否将单个索引放入多个分区?

6

根据经验,每当您在Dask数据框上使用 set_index 时,Dask都会将索引相同的行放入单个分区中,即使这导致分区严重不平衡。

下面是示例:

import pandas as pd
import dask.dataframe as dd

users = [1]*1000 + [2]*1000 + [3]*1000

df = pd.DataFrame({'user': users})
ddf = dd.from_pandas(df, npartitions=1000)

ddf = ddf.set_index('user')

counts = ddf.map_partitions(lambda x: len(x)).compute()
counts.loc[counts > 0]
# 500    1000
# 999    2000
# dtype: int64

然而,我没有在任何地方找到这种行为的保证。

我曾经试图自己查看代码,但是放弃了。我相信其中一个相互关联的函数可能掌握着答案:

当您使用set_index时,一个单独的索引是否永远不可能在两个不同的分区中?如果不是,那么在什么条件下会保持此属性?


赏金:我将奖励那些从可靠来源中汲取的答案。例如,参考实现以显示必须保持此属性。

3个回答

2
“一个索引值不能同时存在于两个不同的分区”这种说法是错误的。Dask甚至会鼓励这样做。然而,由于“set_index”函数中存在一个错误(见这里),所有数据最终仍会被放在一个分区中。
以下是一个极端的例子(每行数据都相同,只有一个值不同):
In [1]: import dask.dataframe as dd
In [2]: import pandas as pd
In [3]: df = pd.DataFrame({"A": [0] + [1] * 20})
In [4]: ddf = dd.from_pandas(df, npartitions=10)
In [5]: s = ddf.set_index("A")
In [6]: s.divisions
Out[6]: (0, 0, 0, 0, 0, 0, 0, 1)

正如您所看到的,Dask希望将0分割成多个分区。然而当洗牌实际发生时,所有的0仍然会在一个分区中:
In [7]: import dask
In [8]: dask.compute(s.to_delayed())  # easy way to see the partitions separately
Out[8]: 
([Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [],
  Empty DataFrame
  Columns: []
  Index: [0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]],)

这是因为决定行属于哪个输出分区的代码在考虑divisions中是否有重复值时不起作用。将divisions视为一系列数据,它使用带有side="right"参数的searchsorted函数,因此所有数据总是最终落入最后一个分区。
当该问题得到解决时,我会更新这个答案。

0

我刚刚注意到Dask的shuffle文档中写道:

此操作后,具有相同on值的行将位于同一分区中。

这似乎证实了我的经验观察。


0
单个索引永远不能存在于两个不同的分区中吗?
就实际目的而言,如果我理解正确,答案是肯定的。
通常情况下,dask dataframe 将有多个分区,dask可能知道与每个分区相关的索引值(请参见Partitions)。如果 dask 确实知道哪个分区包含哪个索引范围,则这将反映在 df.divisions 输出中(如果没有,则此调用的结果将为 None)。
运行 .set_index 时,dask 将计算 divisions,在确定 divisions 时,它似乎要求 divisions 是连续且唯一的(除了最后一个元素)。相关代码在 这里
因此有两个可能的跟进问题:为什么不允许任何非连续索引,以及之前的特定情况中为什么不允许重复索引在分区中。
关于第一个问题:对于小型数据,可以考虑设计允许非排序索引,但是可以想象一般的非排序索引不会很好地扩展,因为Dask需要以某种方式存储每个分区的索引。
关于第二个问题:似乎应该是可能的,但现在实现似乎不正确。请参见下面的代码片段:
# use this to generate 10 indexed partitions
import pandas as pd

for user in range(10):
    
    df = pd.DataFrame({'user_col': [user//3]*100})
    df['user'] = df['user_col']
    df = df.set_index('user')
    df.index.name = 'user_index'
    
    df.to_parquet(f'test_{user}.parquet', index=True)


# now load them into a dask dataframe
import dask.dataframe as dd

ddf = dd.read_parquet('test_*.parquet')

# dask will know about the divisions
print(ddf.known_divisions) # True

# further evidence
print(ddf.divisions) # (0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3)

# this should show three partitions, but will show only one
print(ddf.loc[0].npartitions) # 1

谢谢您的回答。我认为我的问题可能需要更清晰地表明我特别关注Dask的set_index行为。这个答案非常相关和有趣,但并没有解决我的问题。 - Dahn
顺便说一下,在你的例子中,分区实际上是未知的(known_divisionsFalse)。我正在使用 Dask 2021.08.01、Pandas 1.3.1 和 PyArrow 5.0.0 - Dahn
没关系,我对dask的内部不是很有信心。 - SultanOrazbayev

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接