Pandas的apply、rolling、groupby功能如何应用于多个输入和多个输出列?

6

最近一周我一直在努力使用 apply 函数来对整个 pandas dataframe 进行操作,包括使用 rolling 窗口函数,groupby 函数以及涉及多输入列和多输出列的函数。我在 Stack Overflow 上找到了大量关于这个主题的问题以及许多过时的答案。所以我开始为每个可能的 x 输入和输出、滚动、滚动与 groupby 结合等创建一个笔记本,并且我也注重 性能。由于我不是唯一遇到这些问题的人,我想在这里提供我的解决方案和可工作的示例,希望它能帮助任何现有/未来的pandas用户。

1个回答

7

重要提示

  1. 在pandas中,'apply & rolling'的组合有一个非常强的输出要求。您必须返回单个值。不能返回pd.Series、列表、数组或以任何形式嵌套数组,只能返回一个值,例如一个整数。这个要求使得尝试为多个列返回多个输出变得困难。我不明白为什么'apply & rolling'有这个要求,因为没有rolling时'apply'并没有这个要求。可能是由于某些内部pandas函数造成的。
  2. 'apply & rolling'与多个输入列的组合根本不起作用!想象一下一个包含2列、6行的数据框,您想应用一个自定义函数,并使用窗口大小为2的滚动窗口。您的函数应该获取一个包含2x2个值的输入数组——每个列的2个值,共2行。但似乎pandas不能同时处理rolling和多个输入列。我试图使用axis参数使其正常工作,但是:
    • Axis = 0,将对每列调用您的函数。在上述数据框中,它将调用您的函数10次(而不是12次,因为rolling=2),并且由于是针对每个列,它只提供了该列的2个滚动值...
    • Axis = 1,将对每行调用您的函数。这可能是您想要的,但是pandas不会提供2x2的输入。它实际上完全忽略了rolling,只提供一个包含两列值的行...
  3. 当使用'apply'与多个输入列时,可以提供一个名为raw(布尔)的参数。默认情况下,它为False,这意味着输入将是一个pd.Series,并且在值旁边包括索引。如果您不需要索引,则可以将raw设置为True以获得Numpy数组,这通常可以实现更好的性能。
  4. 当结合'rolling & groupby'时,它返回一个多级索引系列,不能很容易地用作新列的输入。最简单的解决办法是添加一个reset_index(drop=True),如此回答和评论中所述(Python - rolling functions for GroupBy object)。
  5. 您可能会问我,什么时候才需要使用滚动、groupby、带有多个输出的自定义函数呢?答案:最近我必须对一个包含500万条记录(速度/性能很重要)的数据集执行具有滑动窗口(rolling)的傅里叶变换,并对数据集中的不同批次(groupby)进行操作。我需要将傅里叶变换的功率和相位保存在不同的列中(多个输出)。大多数人可能只需要以下一些基本示例,但我相信对于机器学习/数据科学领域来说,更复杂的示例可能会有用。
  6. 让我知道您是否有更好、更清晰或更快的方法来执行以下任何解决方案。我会更新我的回答,这样我们都能受益!

代码示例

首先,我们创建一个数据框,该数据框将在下面的所有示例中使用,包括用于groupby示例的分组列。 对于滚动窗口和多个输入/输出列,我在以下所有代码示例中只使用了2个,但显然这可以是任何大于1的数字。

df = pd.DataFrame(np.random.randint(0,5,size=(6, 2)), columns=list('ab'))
df['group'] = [0, 0, 0, 1, 1, 1]
df = df[['group', 'a', 'b']]

它将会呈现如下形式:
group   a   b
0   0   2   2
1   0   4   1
2   0   0   4
3   1   0   2
4   1   3   2
5   1   3   0

输入1列,输出1列

基础知识

def func_i1_o1(x):    
    return x+1

df['c'] = df['b'].apply(func_i1_o1)


滚动

def func_i1_o1_rolling(x):
    return (x[0] + x[1])

df['d'] = df['c'].rolling(2).apply(func_i1_o1_rolling, raw=True)


滚动和分组

将重置索引的解决方案(请参见上述说明)添加到滚动函数中。

df['e'] = df.groupby('group')['c'].rolling(2).apply(func_i1_o1_rolling, raw=True).reset_index(drop=True)

输入两列,输出一列

基础

def func_i2_o1(x):
    return np.sum(x)

df['f'] = df[['b', 'c']].apply(func_i2_o1, axis=1, raw=True)


滚动(Rolling)

如上面的注释中在第2点解释的那样,对于两个输入并没有一个正常的解决方案。下面的解决方法使用'raw=False'以确保输入是pd.Series,这意味着我们也可以得到紧跟值后面的索引。这使我们能够按正确的索引从其他列获取值并将其用于计算。

def func_i2_o1_rolling(x):
    values_b = x
    values_c = df.loc[x.index, 'c'].to_numpy()
    return np.sum(values_b) + np.sum(values_c)

df['g'] = df['b'].rolling(2).apply(func_i2_o1_rolling, raw=False)


滚动和分组

将reset_index解决方案(见上文注释)添加到rolling函数中。

df['h'] = df.groupby('group')['b'].rolling(2).apply(func_i2_o1_rolling, raw=False).reset_index(drop=True)

输入1列,输出2列

基础知识

您可以通过返回pd.Series来使用“常规”解决方案:

def func_i1_o2(x):
    return pd.Series((x+1, x+2))

df[['i', 'j']] = df['b'].apply(func_i1_o2)

或者您可以使用zip / tuple组合,它的速度大约快8倍!

def func_i1_o2_fast(x):
    return x+1, x+2

df['k'], df['l'] = zip(*df['b'].apply(func_i1_o2_fast))


滚动计算

如上述注释中的第1点所解释的那样,当使用“滚动和应用”结合时,如果我们想要返回多个值,则需要一个解决方法。我找到了2个可行的解决方案。

1

def func_i1_o2_rolling_solution1(x):
    output_1 = np.max(x)
    output_2 = np.min(x)
    # Last index is where to place the final values: x.index[-1]
    df.at[x.index[-1], ['m', 'n']] = output_1, output_2
    return 0

df['m'], df['n'] = (np.nan, np.nan)
df['b'].rolling(2).apply(func_i1_o2_rolling_solution1, raw=False)

优点:所有操作都在一个函数中完成。
缺点:必须先创建列,并且由于不使用原始输入,速度较慢。

2

rolling_w = 2
nan_prefix = (rolling_w - 1) * [np.nan]
output_list_1 = nan_prefix.copy()
output_list_2 = nan_prefix.copy()

def func_i1_o2_rolling_solution2(x):
    output_list_1.append(np.max(x))
    output_list_2.append(np.min(x))
    return 0

df['b'].rolling(rolling_w).apply(func_i1_o2_rolling_solution2, raw=True)
df['o'] = output_list_1
df['p'] = output_list_2
优点:使用原始输入速度快了约两倍。并且,由于它不使用索引来设置输出值,所以代码看起来更清晰(至少对我是这样的)。
缺点:您必须自己创建nan前缀,并且需要多写一些代码行。

滚动&分组

通常,我会使用上述更快的第二个解决方案。然而,由于我们正在组合分组和滚动,这意味着您需要在数据集中适当的索引位置手动设置NaN或零(取决于组数)。对我来说,似乎在组合滚动、分组和多个输出列时,第一种解决方案更容易,可以自动解决NaN/分组问题。再次强调,我在最后使用reset_index解决方案。

def func_i1_o2_rolling_groupby(x):
    output_1 = np.max(x)
    output_2 = np.min(x)
    # Last index is where to place the final values: x.index[-1]
    df.at[x.index[-1], ['q', 'r']] = output_1, output_2
    return 0

df['q'], df['r'] = (np.nan, np.nan)
df.groupby('group')['b'].rolling(2).apply(func_i1_o2_rolling_groupby, raw=False).reset_index(drop=True)

输入2列,输出2列

基础

我建议使用与i1_o2相同的“快速”方式,唯一的区别是你要获取2个输入值来使用。

def func_i2_o2(x):
    return np.mean(x), np.median(x)

df['s'], df['t'] = zip(*df[['b', 'c']].apply(func_i2_o2, axis=1))


滚动处理

由于我使用一个解决方法来应用带有多个输入的滚动处理,而我又使用另一个解决方案来进行多个输出的滚动处理,因此您可以猜到我需要将它们结合在一起。
1. 使用索引从其他列中获取值(请参见func_i2_o1_rolling)
2. 在正确的索引上设置最终的多个输出(请参见func_i1_o2_rolling_solution1)

def func_i2_o2_rolling(x):
    values_b = x.to_numpy()
    values_c = df.loc[x.index, 'c'].to_numpy()
    output_1 = np.min([np.sum(values_b), np.sum(values_c)])
    output_2 = np.max([np.sum(values_b), np.sum(values_c)])    
    # Last index is where to place the final values: x.index[-1]
    df.at[x.index[-1], ['u', 'v']] = output_1, output_2
    return 0

df['u'], df['v'] = (np.nan, np.nan)
df['b'].rolling(2).apply(func_i2_o2_rolling, raw=False)


滚动与分组

向rolling函数添加reset_index解决方案(见上述注释)。

def func_i2_o2_rolling_groupby(x):
    values_b = x.to_numpy()
    values_c = df.loc[x.index, 'c'].to_numpy()
    output_1 = np.min([np.sum(values_b), np.sum(values_c)])
    output_2 = np.max([np.sum(values_b), np.sum(values_c)])    
    # Last index is where to place the final values: x.index[-1]
    df.at[x.index[-1], ['w', 'x']] = output_1, output_2
    return 0

df['w'], df['x'] = (np.nan, np.nan)
df.groupby('group')['b'].rolling(2).apply(func_i2_o2_rolling_groupby, raw=False).reset_index(drop=True)

1
我使用numpy绕过了滚动/应用的限制,具体使用了np.lib.stride_tricks.sliding_window_viewnp.apply_along_axis。这需要将输出数组添加回原始pandas DF并填充初始行。不确定与你展示的方法相比性能如何。不确定你是否对pandas之外的解决方案感兴趣? - Olivier
@Olivier 很高兴你找到了另一种规避限制的方法!我倾向于尽可能少地使用框架,所以对于使用你的解决方案还不确定,但我对性能比较很好奇。你有尝试过什么吗? - Bob de Graaf
感谢您的帖子-在搜索了几个小时后,它绝对是我的起点。您是否有任何想法如何处理时间序列,例如.rolling("10d", on="date", closed="left")?如果我使用groupby("some_col")["date"]进行分组,那么我会收到错误消息ValueError: invalid on specified as date, must be a column (of DataFrame), an Index or None - Jossy
@Jossy 这感觉更像是一个新问题,我不知道你的 df 究竟长什么样。但是给我发私信,我会尽力帮助你 :) - Bob de Graaf

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接