Pandas使用多列进行滚动应用

37
我正在尝试在多列上使用pandas.DataFrame.rolling.apply()滚动函数。 Python版本是3.7,pandas版本是1.0.2。
import pandas as pd

#function to calculate
def masscenter(x):
    print(x); # for debug purposes
    return 0;

#simple DF creation routine
df = pd.DataFrame( [['02:59:47.000282', 87.60, 739],
                    ['03:00:01.042391', 87.51, 10],
                    ['03:00:01.630182', 87.51, 10],
                    ['03:00:01.635150', 88.00, 792],
                    ['03:00:01.914104', 88.00, 10]], 
                   columns=['stamp', 'price','nQty'])
df['stamp'] = pd.to_datetime(df2['stamp'], format='%H:%M:%S.%f')
df.set_index('stamp', inplace=True, drop=True)

'stamp'是单调且唯一的,'price'是双精度且不包含NaN,'nQty'是整数且也不包含NaN。
所以,我需要计算滚动的'质心',即sum(price*nQty)/sum(nQty)
我目前尝试过的方法是:
df.apply(masscenter, axis = 1)

masscenter会被调用5次,每次只有一行输入,输出结果如下:

price     87.6
nQty     739.0
Name: 1900-01-01 02:59:47.000282, dtype: float64

这是输入到masscenter的期望,因为我可以通过x[0],x[1]轻松访问pricenQty。然而,我在rolling.apply()上遇到了困难。 阅读文档 DataFrame.rolling()rolling.apply() 我认为在rolling()中使用'axis',在apply中使用'raw'可以实现类似的行为。一个天真的方法

rol = df.rolling(window=2)
rol.apply(masscenter)

逐行打印(逐渐增加行数,直到窗口大小)
stamp
1900-01-01 02:59:47.000282    87.60
1900-01-01 03:00:01.042391    87.51
dtype: float64

那么

stamp
1900-01-01 02:59:47.000282    739.0
1900-01-01 03:00:01.042391     10.0
dtype: float64

所以,列被分别传递给masscenter(预期)。
可悲的是,在文档中几乎没有关于'axis'的信息。然而,下一个变量显然是。
rol = df.rolling(window=2, axis = 1)
rol.apply(masscenter)

从未调用masscenter并引发rol.apply(..)中的ValueError
> Length of passed values is 1, index implies 5

我承认我对'axis'参数以及它的工作原理不太确定,因为缺乏文档。这是问题的第一部分: 这里发生了什么?如何正确使用'axis'?它的设计目的是什么? 当然,之前有过答案,具体如下: 如何将函数应用于pandas数据框的两列 它适用于整个DataFrame,而不是滚动操作。

如何使用多列参数调用pandas滚动应用函数
答案建议编写自己的滚动函数,但对我来说,与评论中提到的问题是一样的:如果需要使用偏移窗口大小(例如'1T')来处理非均匀时间戳,该怎么办?
我不喜欢从头开始重新发明轮子的想法。而且我想要使用pandas来完成所有操作,以避免pandas和“自制滚动函数”之间的不一致性。 对于这个问题,还有另一个答案,建议分别填充数据框并计算所需内容,但这种方法行不通:存储数据的大小将会非常庞大。 这里提出了相同的想法:
在pandas数据框上应用滚动函数,使用多个参数

另一个问题和答案在这里发布
Pandas-using-rolling-on-multiple-columns
这个答案很好,离我的问题最近,但是再次强调,没有办法使用偏移窗口大小(window = '1T')。
一些答案是在pandas 1.0发布之前提出的,鉴于文档可能会更好,我希望现在可以同时在多个列上进行滚动。
问题的第二部分是: 在pandas 1.0.x中,有没有可能使用偏移窗口大小同时在多个列上进行滚动?

masscenter只是一个将每个x映射为零的函数吗? - High GPA
df2['stamp'] 未定义,我有遗漏吗? - High GPA
1
@HighGPA,“masscenter”函数是这样构建的,以创建一个最小的可重现示例。您是否声明了columns=['stamp', 'price','nQty'] - Suthiro
6个回答

35

这个怎么样:

def masscenter(ser):
    print(df.loc[ser.index])
    return 0

rol = df.price.rolling(window=2)
rol.apply(masscenter, raw=False)

它使用滚动逻辑从任意列获取子集。raw=False选项为您提供了这些子集的索引值(以Series形式给出),然后您可以使用这些索引值从原始DataFrame中获取多列切片。


6
对于较大的数据集来说,这个解决方案非常昂贵。 - Harald Husum
1
你能详细说明一下你的观察吗? - adr
然后您可以使用这些索引值从原始数据帧中获取多列切片。您是指使用.iloc[i, c]masscenter内部访问数据帧吗?也就是说,masscenter没有直接需要的参数? - falsePockets
5
这对我来说可行,不过Pandas的开发人员应该想办法加以实施。 - w00dy
答案已经按照apply的文档要求,已经适当更新,将df作为参数传递给应用函数。 - undefined
显示剩余7条评论

20
你可以使用numpy_ext模块中的rolling_apply函数:

rolling_apply 函数来自numpy_ext 模块:

import numpy as np
import pandas as pd
from numpy_ext import rolling_apply


def masscenter(price, nQty):
    return np.sum(price * nQty) / np.sum(nQty)


df = pd.DataFrame( [['02:59:47.000282', 87.60, 739],
                    ['03:00:01.042391', 87.51, 10],
                    ['03:00:01.630182', 87.51, 10],
                    ['03:00:01.635150', 88.00, 792],
                    ['03:00:01.914104', 88.00, 10]], 
                   columns=['stamp', 'price','nQty'])
df['stamp'] = pd.to_datetime(df['stamp'], format='%H:%M:%S.%f')
df.set_index('stamp', inplace=True, drop=True)

window = 2
df['y'] = rolling_apply(masscenter, window, df.price.values, df.nQty.values)
print(df)

                            price  nQty          y
stamp                                             
1900-01-01 02:59:47.000282  87.60   739        NaN
1900-01-01 03:00:01.042391  87.51    10  87.598798
1900-01-01 03:00:01.630182  87.51    10  87.510000
1900-01-01 03:00:01.635150  88.00   792  87.993890
1900-01-01 03:00:01.914104  88.00    10  88.000000

谢谢,但唉!它也接受固定的窗口大小-2(或任何数字),但不是或其他所谓的偏移量。然而,你给了我一个想法。我会尝试一下,如果有效果,就会很快发布。 - Suthiro
如果您的数据不太稀疏,可以使用rolling_apply建议使用足够大的窗口来包含给定时间偏移记录,并在应用函数内部合并对时间戳的边界检查。您可能需要使用大窗口,但是rolling_apply执行并行作业的能力可能会弥补这一点。 - shaunc

6

参考@saninstein的精彩答案。

https://pypi.org/project/numpy-ext/安装numpy_ext。

import numpy as np
import pandas as pd
from numpy_ext import rolling_apply as rolling_apply_ext

def box_sum(a,b):
    return np.sum(a) + np.sum(b)

df = pd.DataFrame({"x": [1,2,3,4], "y": [1,2,3,4]})

window = 2
df["sum"] = rolling_apply_ext(box_sum, window , df.x.values, df.y.values)

输出:

print(df.to_string(index=False))
 x  y  sum
 1  1  NaN
 2  2  6.0
 3  3 10.0
 4  4 14.0

注意事项

  • 滚动函数适用于时间序列。默认情况下,它总是向后查看,因此数组中的6是现在和过去值的总和。
  • 在上面的示例中,将rolling_apply导入为rolling_apply_ext,以避免与Pandas rolling_apply的任何现有调用产生冲突(感谢@LudoSchmidt的评论)。

顺便说一句,我放弃了尝试使用Pandas。它本质上是有缺陷的:它可以很好地处理单列聚合和应用,但在尝试使用两个或更多列时,它变成了一个过于复杂的鲁比金伯格机器。


我使用了 "pip install" 并且使用了 "from numpy_ext import rolling_apply"。但是它破坏了我的脚本中的 pandas。请问您有类似的经验吗? - Ludo Schmidt
@LudoSchmidt 很好的建议。已更新上面的代码,将rolling_apply导入为rolling_apply_ext,因此一切都与Pandas中现有的rolling_apply调用向后兼容。 - Contango
5
我放弃尝试使用Pandas了,它本质上存在问题。 - Suthiro
在GroupBy上,你会怎么做这个操作? - Tom

4

要对数据框执行滚动窗口操作并访问所有列,您可以将method='table'传递给rolling()。例如:

import pandas as pd
import numpy as np
from numba import jit

df = pd.DataFrame({'a': [1, 2, 3, 4, 5, 6], 'b': [1, 3, 5, 7, 9, 11]})

@jit
def f(w):
    # we have access to both columns of the dataframe here
    return np.max(w), np.min(w)

df.rolling(3, method='table').apply(f, raw=True, engine='numba')

应该注意,method='table'需要安装numba引擎(pip install numba)。在示例中的@jit部分不是必须的,但有助于提高性能。上述示例代码的结果将是:
a b
NaN NaN
NaN NaN
5.0 1.0
7.0 2.0
9.0 3.0
11.0 4.0

1

因此我找不到在没有内置pandas函数的情况下滚动两列的方法。 代码如下。

# function to find an index corresponding
# to current value minus offset value
def prevInd(series, offset, date):
    offset = to_offset(offset)
    end_date = date - offset
    end = series.index.searchsorted(end_date, side="left")
    return end

# function to find an index corresponding
# to the first value greater than current
# it is useful when one has timeseries with non-unique
# but monotonically increasing values
def nextInd(series, date):
    end = series.index.searchsorted(date, side="right")
    return end

def twoColumnsRoll(dFrame, offset, usecols, fn, columnName = 'twoColRol'):
    # find all unique indices
    uniqueIndices = dFrame.index.unique()
    numOfPoints = len(uniqueIndices)
    # prepare an output array
    moving = np.zeros(numOfPoints)
    # nameholders
    price = dFrame[usecols[0]]
    qty   = dFrame[usecols[1]]

    # iterate over unique indices
    for ii in range(numOfPoints):
        # nameholder
        pp = uniqueIndices[ii]
        # right index - value greater than current
        rInd = afta.nextInd(dFrame,pp)
        # left index - the least value that 
        # is bigger or equal than (pp - offset)
        lInd = afta.prevInd(dFrame,offset,pp)
        # call the actual calcuating function over two arrays
        moving[ii] = fn(price[lInd:rInd], qty[lInd:rInd])
    # construct and return DataFrame
    return pd.DataFrame(data=moving,index=uniqueIndices,columns=[columnName])

这段代码可以运行,但相对较慢且效率低下。我想可以使用如何在多列中调用pandas.rolling.apply的参数中的numpy.lib.stride_tricks来加速。 不过,要么做到最好,要么就放弃——我最终编写了一个C++函数和其包装器。
我不想将其发布为答案,因为它只是一个解决方法,而且我也没有回答我的问题的任何部分,但是它太长了,不适合作为评论。

0

这个怎么样?

ggg = pd.DataFrame({"a":[1,2,3,4,5,6,7], "b":[7,6,5,4,3,2,1]})

def my_rolling_apply2(df, fun, window):
    prepend = [None] * (window - 1)
    end = len(df) - window
    mid = map(lambda start: fun(df[start:start + window]), np.arange(0,end))
    last =  fun(df[end:])
    return [*prepend, *mid, last]

my_rolling_apply2(ggg, lambda df: (df["a"].max(), df["b"].min()), 3)

结果是:

[None, None, (3, 5), (4, 4), (5, 3), (6, 2), (7, 1)]

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接