Pandas在滚动计算中使用apply函数并输出多列结果

7
我正在编写一段代码,它将对一个函数应用滚动窗口,并返回多列数据。
输入:Pandas序列
期望输出:三列DataFrame
def fun1(series, ):
    # Some calculations producing numbers a, b and c
    return {"a": a, "b": b, "c": c} 

res.rolling('21 D').apply(fun1)

res的内容:

time
2019-09-26 16:00:00    0.674969
2019-09-26 16:15:00    0.249569
2019-09-26 16:30:00   -0.529949
2019-09-26 16:45:00   -0.247077
2019-09-26 17:00:00    0.390827
                         ...   
2019-10-17 22:45:00    0.232998
2019-10-17 23:00:00    0.590827
2019-10-17 23:15:00    0.768991
2019-10-17 23:30:00    0.142661
2019-10-17 23:45:00   -0.555284
Length: 1830, dtype: float64

错误:

TypeError: must be real number, not dict

我尝试过以下方法:

  • 在apply中更改raw=True
  • 在apply中使用lambda函数
  • 将fun1中的结果作为列表/NumPy数组/数据帧/系列返回。

我还查阅了许多相关的SO帖子,其中包括:

但是,没有一个特定的解决方案可以解决这个问题。

有没有一种简单的解决方案?

3个回答

7

这里是一个使用rolling的“hacky”答案,生成一个DataFrame:

import pandas as pd
import numpy as np

dr = pd.date_range('09-26-2019', '10-17-2019', freq='15T')
data = np.random.rand(len(dr))

s = pd.Series(data, index=dr)

output = pd.DataFrame(columns=['a','b','c'])

row = 0

def compute(window, df):
    global row
    a = window.max()
    b = window.min()
    c = a - b
    df.loc[row,['a','b','c']] = [a,b,c]
    row+=1    
    return 1
    
s.rolling('1D').apply(compute,kwargs={'df':output})

output.index = s.index

似乎rolling apply函数总是期望返回一个数字,以便立即基于计算生成新的Series。
我通过创建一个新的output DataFrame(带有所需的输出列),并在函数内部写入该DataFrame来解决这个问题。我不确定是否有一种方法可以在滚动对象中获取索引,因此我使用global来使递增计数器以编写新行。但是,考虑到上述一点,您需要return一些数字。因此,尽管实际的rolling操作返回一系列1output被修改:
In[0]:
s

Out[0]:
2019-09-26 00:00:00    0.106208
2019-09-26 00:15:00    0.979709
2019-09-26 00:30:00    0.748573
2019-09-26 00:45:00    0.702593
2019-09-26 01:00:00    0.617028
  
2019-10-16 23:00:00    0.742230
2019-10-16 23:15:00    0.729797
2019-10-16 23:30:00    0.094662
2019-10-16 23:45:00    0.967469
2019-10-17 00:00:00    0.455361
Freq: 15T, Length: 2017, dtype: float64

In[1]:
output

Out[1]:
                           a         b         c
2019-09-26 00:00:00  0.106208  0.106208  0.000000
2019-09-26 00:15:00  0.979709  0.106208  0.873501
2019-09-26 00:30:00  0.979709  0.106208  0.873501
2019-09-26 00:45:00  0.979709  0.106208  0.873501
2019-09-26 01:00:00  0.979709  0.106208  0.873501
                      ...       ...       ...
2019-10-16 23:00:00  0.980544  0.022601  0.957943
2019-10-16 23:15:00  0.980544  0.022601  0.957943
2019-10-16 23:30:00  0.980544  0.022601  0.957943
2019-10-16 23:45:00  0.980544  0.022601  0.957943
2019-10-17 00:00:00  0.980544  0.022601  0.957943

[2017 rows x 3 columns]

这感觉更像是对`rolling`的一种利用,而不是预期使用,所以我很想看到更优雅的答案。
更新:感谢@JuanPi,您可以使用此答案获得滚动窗口索引。因此,一个非全局的答案可能如下:
def compute(window, df):
    a = window.max()
    b = window.min()
    c = a - b
    df.loc[window.index.max(),['a','b','c']] = [a,b,c]  
    return 1

2
您可以使用此答案中的技巧https://dev59.com/s1IH5IYBdhLWcg3wLqXB#60918101获取当前窗口的索引。 - JuanPi
@JuanPi 感谢分享,我正想问呢!我更新了我的回答并加入了此内容。 - Tom
不算太hacky,你基本上是利用pandas的rolling功能作为窗口生成器。通常的滚动窗口不会包含前导NaNs,但如果需要,它们可以被添加在前面。 - GratefulGuest

4

这个技巧对我来说似乎有效,尽管滚动的其他功能无法应用于此解决方案。然而,由于多进程处理,应用程序的速度显着提高。

from multiprocessing import Pool
import functools


def apply_fn(indices, fn, df):
    return fn(df.loc[indices])
              
    
def rolling_apply(df, fn, window_size, start=None, end=None):
    """
    The rolling application of a function fn on a DataFrame df given the window_size
    """
    x = df.index
    if start is not None:
        x = x[x >= start]
    if end is not None:
        x = x[x <= end]
    if type(window_size) == str:
        delta = pd.Timedelta(window_size)
        index_sets = [x[(x > (i - delta)) & (x <= i)] for i in x]
    else: 
        assert type(window_size) == int, "Window size should be str (representing Timedelta) or int"
        delta = window_size
        index_sets = [x[(x > (i - delta)) & (x <= i)] for i in x]
    
    with Pool() as pool:
        result = list(pool.map(functools.partial(apply_fn, fn=fn, df=df), index_sets))
    result = pd.DataFrame(data=result, index=x)
        
    return result

在设置好以上功能后,将该函数插入到自定义rolling_function中。

result = rolling_apply(res, fun1, "21 D")

结果内容:

                    a           b           c
time            
2019-09-26 16:00:00 NaN         NaN         NaN
2019-09-26 16:15:00 0.500000    0.106350    0.196394
2019-09-26 16:30:00 0.500000    0.389759    -0.724829
2019-09-26 16:45:00 2.000000    0.141436    -0.529949
2019-09-26 17:00:00 6.010184    0.141436    -0.459231
... ... ... ...
2019-10-17 22:45:00 4.864015    0.204483    -0.761609
2019-10-17 23:00:00 6.607717    0.204647    -0.761421
2019-10-17 23:15:00 7.466364    0.204932    -0.761108
2019-10-17 23:30:00 4.412779    0.204644    -0.760386
2019-10-17 23:45:00 0.998308    0.203039    -0.757979
1830 rows × 3 columns

注意:
  • 这个实现适用于Series和DataFrame输入
  • 这个实现适用于时间和整数窗口
  • fun1返回的结果甚至可以是一个列表、numpy数组、系列或字典
  • window_size只考虑最大窗口大小,因此所有起始索引在window_size以下的窗口都包括起始元素之前的所有元素。
  • 应该不将apply函数嵌套在rolling_apply函数中,因为pool.map不能接受本地或lambda函数,因为根据multiprocessing库,它们不能被“pickled”

超级棒的改进,救了我的一天。 - lucky6qi

0
您可以分别使用 rolling() 和 apply() 来获取多个列。只需从原始 Dataframe 创建一个 Rolling Dataframe,然后多次使用 .apply() 方法。
对于名为'df'的 Dataframe:
windows = df.rolling(window_size)
a_series = windows.apply(lambda x: find_a_for_single_window(x))
b_series = windows.apply(lambda x: find_b_for_single_window(x))
c_series = windows.apply(lambda x: find_c_for_single_window(x))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接