Dask:使用groupby获取具有最大值的行的方法

5

使用Pandas可以使用transform解决同样的问题,详见这里。 使用Dask,我找到的唯一可行的解决方案是使用合并。我想知道是否有其他方法可以实现它。

1个回答

2
首先,我想重写你原来问题中提到的脚本,以确保我理解其意图。据我所知,如下所示的重写,您基本上希望找到一种方法,从每个唯一的foobar组合中提取具有最高计数cnt值的值。下面大致展示了参考脚本如何仅使用Pandas实现此功能。
# create an example dataframe
df = pd.DataFrame({
        'foo' : ['MM1', 'MM1', 'MM1', 'MM2', 'MM2', 'MM2', 'MM4', 'MM4', 'MM4'],
        'bar' : ['S1', 'S1', 'S3', 'S3', 'S4', 'S4', 'S2', 'S2', 'S2'],
        'cnt' : [3, 2, 5, 8, 10, 1, 2, 2, 7],
        'val' : ['a', 'n', 'cb', 'mk', 'bg', 'dgb', 'rd', 'cb', 'uyi'],
    })


grouped_df = (df.groupby(['foo', 'bar'])            # creates a double nested indices
                .agg({'cnt': 'max'})                # returns max value from each grouping
                .rename(columns={'cnt': 'cnt_max'}) # renames the col to avoid conflicts on merge later
                .reset_index())                     # makes the double nested indices columns instead

merged_df = pd.merge(df, grouped_df, how='left', on=['foo', 'bar'])

# note: I believe a shortcoming here is that if ther eis more than one match, this would 
# return multiple results for some pairings...
final_df = merged_df[merged_df['cnt'] == merged_df['cnt_max']]

现在,我就给大家介绍一个适用于Dask的版本,如下所示。详见注释。
# create an example dataframe
df = pd.DataFrame({
        'foo' : ['MM1', 'MM1', 'MM1', 'MM2', 'MM2', 'MM2', 'MM4', 'MM4', 'MM4'],
        'bar' : ['S1', 'S1', 'S3', 'S3', 'S4', 'S4', 'S2', 'S2', 'S2'],
        'cnt' : [3, 2, 5, 8, 10, 1, 2, 2, 7],
        'val' : ['a', 'n', 'cb', 'mk', 'bg', 'dgb', 'rd', 'cb', 'uyi'],
    })

# I'm not sure if we can rely on val to be a col of unique values so I am just going to 
# make a new column that is the id for this, now on a very large dataframe that wouldn't 
# fit in memory this may not be a reasonable method of creating a unique new column but 
# for the purposes of this example this will be sufficient
df['id'] = np.arange(len(df))

# now let's convert this dataframe into a Dask dataframe
# we will only use 1 partition because this is a small sample and would use more in a real world case
ddf = dd.from_pandas(df, npartitions=1)

# create a function that take the results of the grouped by sub dataframes and returns the row
# where the cnt is greatest
def select_max(grouped_df):
    row_with_max_cnt_index = grouped_df['cnt'].argmax()
    row_with_max_cnt = grouped_df.loc[row_with_max_cnt_index]
    return row_with_max_cnt['id']

# now chain that function into an apply run on the output of the groupby operation
# note: this also may not be the best strategy if the resulting list is too long
# if that is the case, will need to better thread the output of this into the next step
keep_ids = ddf.groupby(['foo', 'bar']).apply(select_max, meta=pd.Series()).compute()

# this is pretty straightforward, just get the rows that match the ids from the max cnt applied method
subset_df = ddf[ddf['id'].isin(keep_ids)]
print(subset_df.compute())

嗨@kuanb,感谢您的回复。我尝试了您的解决方案,但它比我提供的那个慢得多(100倍),而且也不能正常工作。 - rpanai
你能解释一下哪里出了问题吗?你遇到了什么错误吗?另外,你正在处理什么大小的数据集?apply函数是一种逐行方法,非常慢,如果可能的话应该避免使用。你的性能问题可能是由于你所做的任何事情可能不需要Dask。如果数据集足够大,那么也许你可以利用Dask Distributed来最有效地将操作并行化在多个资源之间。 - kuanb
你的代码没有打印错误,但是返回的数据框比预期的大了两倍。我猜测是因为相同的日期时间并没有只取最后一个。我的原始数据框包含大约150万行。 - rpanai
我不确定你的第二句话是什么意思。在我们使用的示例中没有日期时间,也没有选择任何与时间相关的逻辑... 鉴于您的数据集很大,您是否尝试先在Pandas中运行此代码,并检查是否满足您的需求? - kuanb
@kuanb,你的第一个pandas示例可以直接使用dask,直到倒数第二行都可以正常工作。最后一行没有测试过。 - muon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接