按组选择最大行 - pandas 性能问题

7

我正在每个组内选择一行最大值,并使用groupby/agg返回索引值,并使用loc选择行。

例如,按"Id"分组,然后选择具有最高"delta"值的行:

selected_idx = df.groupby("Id").apply(lambda df: df.delta.argmax())
selected_rows = df.loc[selected_idx, :]

然而,这种方法非常慢。实际上,在处理1300万行的查询时,我的i7/16GB RAM笔记本电脑会挂起。

我有两个问题要问专家:

  1. 我如何使此查询在pandas中快速运行?我做错了什么?
  2. 为什么这个操作如此昂贵?

[更新] 非常感谢@unutbu的分析! 使用sort_drop! 在我的i7 / 32GRAM机器上,groupby + idxmax几乎需要14个小时(从未返回任何内容),然而sort_drop在不到一分钟的时间内完成了处理!

我仍然需要查看pandas如何实现每种方法,但暂时解决了问题!我喜欢StackOverflow。


相关链接:如何找出每列中第一个非NaN值是否为该列的最大值?。另外,了解这些答案的性能比较和扩展性也会很不错。 - smci
2个回答

14
最快的选项不仅取决于DataFrame的长度(在这种情况下,约为13M行),还取决于组数。以下是性能图,比较了多种查找每个组中最大值的方法:
如果只有一些(大)组,则使用using_idxmax可能是最快的选择:enter image description here 如果有许多(小)组且DataFrame不太大,则使用using_sort_drop可能是最快的选择:enter image description here 但请记住,尽管using_sort_drop、using_sort和using_rank开始看起来非常快,但随着N = len(df)的增加,它们相对于其他选项的速度很快就会消失。对于足够大的N,即使有许多组,using_idxmax也成为最快的选项。
使用using_sort_dropusing_sortusing_rank对DataFrame(或DataFrame内的分组)进行排序。平均排序时间复杂度为O(N * log(N)),而其他方法使用O(N)操作。这就是为什么像using_idxmax这样的方法在处理非常大的数据集时要比using_sort_drop更快的原因。
请注意,基准测试结果可能会因多种原因而有所不同,包括机器规格、操作系统和软件版本。因此,在自己的机器上运行基准测试,并使用适合自己情况的测试数据非常重要。
根据上面的性能图表,如果您的DataFrame有许多(小)分组,则使用using_sort_drop可能是值得考虑的选项,尤其是当它有1300万行时。否则,我会怀疑using_idxmax是最快的选项——但再次强调,在自己的机器上检查基准测试非常重要。

以下是我用来制作perfplots的设置:

import numpy as np
import pandas as pd 
import perfplot

def make_df(N):
    # lots of small groups
    df = pd.DataFrame(np.random.randint(N//10+1, size=(N, 2)), columns=['Id','delta'])
    # few large groups
    # df = pd.DataFrame(np.random.randint(10, size=(N, 2)), columns=['Id','delta'])
    return df


def using_idxmax(df):
    return df.loc[df.groupby("Id")['delta'].idxmax()]

def max_mask(s):
    i = np.asarray(s).argmax()
    result = [False]*len(s)
    result[i] = True
    return result

def using_custom_mask(df):
    mask = df.groupby("Id")['delta'].transform(max_mask)
    return df.loc[mask]

def using_isin(df):
    idx = df.groupby("Id")['delta'].idxmax()
    mask = df.index.isin(idx)
    return df.loc[mask]

def using_sort(df):
    df = df.sort_values(by=['delta'], ascending=False, kind='mergesort')
    return df.groupby('Id', as_index=False).first()

def using_rank(df):
    mask = (df.groupby('Id')['delta'].rank(method='first', ascending=False) == 1)
    return df.loc[mask]

def using_sort_drop(df):
    # Thanks to jezrael
    # https://dev59.com/kqvka4cB1Zd3GeqPtHCB#50389889?noredirect=1#comment87795818_50389889
    return df.sort_values(by=['delta'], ascending=False, kind='mergesort').drop_duplicates('Id')

def using_apply(df):
    selected_idx = df.groupby("Id").apply(lambda df: df.delta.argmax())
    return df.loc[selected_idx]

def check(df1, df2):
    df1 = df1.sort_values(by=['Id','delta'], kind='mergesort').reset_index(drop=True)
    df2 = df2.sort_values(by=['Id','delta'], kind='mergesort').reset_index(drop=True)
    return df1.equals(df2)

perfplot.show(
    setup=make_df,
    kernels=[using_idxmax, using_custom_mask, using_isin, using_sort, 
             using_rank, using_apply, using_sort_drop],
    n_range=[2**k for k in range(2, 20)],
    logx=True,
    logy=True,
    xlabel='len(df)',
    repeat=75,
    equality_check=check)

另一种基准测试方法是使用 IPython %timeit

In [55]:  df = make_df(2**20)

In [56]: %timeit using_sort_drop(df)
1 loop, best of 3: 403 ms per loop

In [57]: %timeit using_rank(df)
1 loop, best of 3: 1.04 s per loop

In [58]: %timeit using_idxmax(df)
1 loop, best of 3: 15.8 s per loop

3
我曾经意识到最快的方式是 df = df.sort_values(by=['delta'], ascending=False).drop_duplicates('Id'),可以将其加入计时吗? - jezrael
1
当然。现在正在运行基准测试;需要一些时间。 - unutbu
1
@jezrael:感谢您的建议。您的方法(我在上面称之为“using_sort_drop”)对于中等大小的数据框确实更快,特别是如果有许多小组。但是对于非常大的数据框,“using_idxmax”将更快。 - unutbu
1
"Moderately large" 取决于群组数量。如果群组很少,using_sort_droplen(df) = 100K 左右具有优势。但如果存在许多小的群组,它在 len(df) around 100M 仍可能是赢家。因此,这在很大程度上取决于您 DataFrame 的性质。最好安装 perfplot 和/或 IPython 并进行一些基准测试。 - unutbu
谢谢这个 - 对于我来说,在一个20MM行的数据框上,排序/删除绝对是比其他方法更有效的。 - trance_dude
显示剩余5条评论

5

Using Numba's jit

from numba import njit
import numpy as np

@njit
def nidxmax(bins, k, weights):
    out = np.zeros(k, np.int64)
    trk = np.zeros(k)
    for i, w in enumerate(weights - (weights.min() - 1)):
        b = bins[i]
        if w > trk[b]:
            trk[b] = w
            out[b] = i
    return np.sort(out)

def with_numba_idxmax(df):
    f, u = pd.factorize(df.Id)
    return df.iloc[nidxmax(f, len(u), df.delta.values)]

借鉴自@unutbu

def make_df(N):
    # lots of small groups
    df = pd.DataFrame(np.random.randint(N//10+1, size=(N, 2)), columns=['Id','delta'])
    # few large groups
    # df = pd.DataFrame(np.random.randint(10, size=(N, 2)), columns=['Id','delta'])
    return df

Prime jit

with_numba_idxmax(make_df(10));

测试

df = make_df(2**20)


%timeit with_numba_idxmax(df)
%timeit using_sort_drop(df)

47.4 ms ± 99.8 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
194 ms ± 451 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

无法感谢您足够,@piRSqured,您的代码使我的代码运行速度提高了5倍以上。 - mathguy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接