如何使用Python + NumPy / SciPy计算滚动/移动平均值?

230

似乎没有简单地在numpy/scipy上计算移动平均的函数,导致需要使用繁琐的卷积方案

我的问题有两个:

  • 使用numpy正确实现移动平均的最简单方法是什么?
  • 既然这似乎非常棘手且容易出错,那么在这种情况下不采用Python标准库哲学的好理由是什么?

3
可能是移动平均或滑动平均的重复问题。 - pppery
19个回答

257

如果你只想要一个简单的非加权移动平均值,你可以使用np.cumsum轻松实现它,这比基于FFT的方法更快:

编辑 代码中Bean发现了一个错位索引错误并进行了更正。编辑

def moving_average(a, n=3) :
    ret = np.cumsum(a, dtype=float)
    ret[n:] = ret[n:] - ret[:-n]
    return ret[n - 1:] / n

>>> a = np.arange(20)
>>> moving_average(a)
array([  1.,   2.,   3.,   4.,   5.,   6.,   7.,   8.,   9.,  10.,  11.,
        12.,  13.,  14.,  15.,  16.,  17.,  18.])
>>> moving_average(a, n=4)
array([  1.5,   2.5,   3.5,   4.5,   5.5,   6.5,   7.5,   8.5,   9.5,
        10.5,  11.5,  12.5,  13.5,  14.5,  15.5,  16.5,  17.5])

所以我想答案是:实现起来真的很容易,而且numpy也许已经有一些过于专业化的功能了。


9
感谢您检查错误,现在看起来已经正常工作了。至于投票,我猜想答案背后的总体想法比实现中的单个偏差更重要,但谁知道呢。 - Jaime
4
我已经找到了问题。ret[n:] -= ret[:-n]ret[n:] = ret[n:] - ret[:-n] 不是相同的。我已经在这个答案中修复了代码。编辑:不,有人比我更快地解决了它。 - Timmmm
8
@Timmmm,我确实这样做了,那确实是问题所在。这个答案背后的一般原则在图像处理中被广泛使用(它们称之为累加和表),因此问题肯定出在实现上。这是过早优化的一个很好的例子,因为我有点记得要进行原地操作“因为它会更有效率”。但好的一面是,它可能更快地产生了错误的答案... - Jaime
64
嗯,看起来这个“易于实现”的函数实际上很容易出错,并引发了有关内存效率的良好讨论。如果可以确保某事做得正确,我宁愿接受代码臃肿的情况。 - Richard
3
这个答案是否需要更新,以包含在numpy v1.20中引入的numpy.lib.stride_tricks.sliding_window_view?解决方案变为一行代码:sliding_window_view(array, window_size).mean(axis=-1)。速度可能与cumsum相当,但我不确定百分之百。 - FirefoxMetzger
显示剩余17条评论

247
实现这个的简单方法是使用np.convolve。其背后的想法是利用计算离散卷积的方式,并将其用于返回滚动均值。这可以通过与长度等于我们想要的滑动窗口长度的np.ones序列进行卷积来实现。
为此,我们可以定义以下函数:
def moving_average(x, w):
    return np.convolve(x, np.ones(w), 'valid') / w

这个函数将对序列x和长度为w的全1序列进行卷积。请注意,所选的modevalid,因此卷积乘积仅在序列完全重叠的点处给出。


一些例子:

x = np.array([5,3,8,10,2,1,5,1,0,2])

对于窗口长度为2的移动平均,我们将有:

moving_average(x, 2)
# array([4. , 5.5, 9. , 6. , 1.5, 3. , 3. , 0.5, 1. ])

对于长度为4的窗口:

moving_average(x, 4)
# array([6.5 , 5.75, 5.25, 4.5 , 2.25, 1.75, 2.  ])

如何使用convolve函数?

让我们更深入地了解离散卷积的计算方式。 以下函数旨在复制np.convolve计算输出值的方式:

def mov_avg(x, w):
    for m in range(len(x)-(w-1)):
        yield sum(np.ones(w) * x[m:m+w]) / w 

同样的例子,使用{{上面提到的方法}}也会得出:

list(mov_avg(x, 2))
# [4.0, 5.5, 9.0, 6.0, 1.5, 3.0, 3.0, 0.5, 1.0]

因此,在每个步骤中所做的是取一数组和当前窗口之间的内积。在这种情况下,由于我们直接取序列的sum,因此乘以np.ones(w)是多余的。

下面是一个示例,说明如何计算前几个输出,以便更清楚地理解。假设我们想要一个窗口大小为w=4

[1,1,1,1]
[5,3,8,10,2,1,5,1,0,2]
= (1*5 + 1*3 + 1*8 + 1*10) / w = 6.5

下面的输出将被计算为:

  [1,1,1,1]
[5,3,8,10,2,1,5,1,0,2]
= (1*3 + 1*8 + 1*10 + 1*2) / w = 5.75

等等,一旦所有重叠都完成,返回序列的移动平均值。


2
这是一个不错的想法!对于小的n,它比@Jaime的答案更快,但对于较大的n则变慢。 - Felipe Gerard
9
有时候保持输出数组和输入数组大小相同是有用的。为此,可以将mode='valid'替换为'same'。但在这种情况下,边缘点会趋向于零。 - Ilia Barahovsky
在函数的数组'x'中,如果有一些元素可能为None或零,那么如何获取从该函数返回的相应'x'值?从该函数返回的数组大小可能小于提供给它的数组'x'的大小。 - Sun Bear
6
需要澄清的是,这种方法将产生“中心移动平均线”,而不是通常用于金融应用程序的“简单移动平均线”。https://en.wikipedia.org/wiki/Moving_average - CraigDavid
1
实际上,你只是在使用步长为1的平均池化,而不是真正的卷积。 - Anonymous

87
NumPy缺乏特定领域函数的原因可能是由于核心团队的纪律和忠诚于NumPy的主要指令:提供一个N维数组类型,以及用于创建和索引这些数组的函数。像许多基础目标一样,这个目标并不小,而NumPy做得非常出色。
(更大的)SciPy包含了更多的领域特定库(由SciPy开发人员称为子包),例如数值优化(optimize)、信号处理(signal)和积分微积分(integrate)。
我猜想你需要的函数至少在一个SciPy子包中(也许是scipy.signal);然而,我建议首先查找SciPy scikits的集合,确定相关的scikit(s),然后在那里寻找感兴趣的函数。 Scikits是基于NumPy/SciPy独立开发的软件包,针对特定的技术领域(例如scikits-imagescikits-learn等)。其中几个项目(特别是优化数字的强大的OpenOpt)在选择居住在相对较新的scikits体系结构之前,就已经是备受推崇和成熟的项目。上面链接到的Scikits主页列出了约30个这样的scikits,但至少其中几个已不再处于活跃开发状态。
按照这个建议,你会找到scikits-timeseries;然而,该软件包已不再处于活跃开发状态;实际上,Pandas已成为我所知道的事实上的基于NumPy的时间序列库。 Pandas有几个可用于计算移动平均值的函数;其中最简单的可能是rolling_mean,使用方法如下:
>>> # the recommended syntax to import pandas
>>> import pandas as PD
>>> import numpy as NP

>>> # prepare some fake data:
>>> # the date-time indices:
>>> t = PD.date_range('1/1/2010', '12/31/2012', freq='D')

>>> # the data:
>>> x = NP.arange(0, t.shape[0])

>>> # combine the data & index into a Pandas 'Series' object
>>> D = PD.Series(x, t)

现在,只需调用函数rolling_mean并传入Series对象和窗口大小,在下面的示例中为10天

>>> d_mva = PD.rolling_mean(D, 10)

>>> # d_mva is the same size as the original Series
>>> d_mva.shape
    (1096,)

>>> # though obviously the first w values are NaN where w is the window size
>>> d_mva[:3]
    2010-01-01         NaN
    2010-01-02         NaN
    2010-01-03         NaN

验证它是否有效——例如,比较原始系列中10-15的值与使用滚动均值平滑后的新系列

>>> D[10:15]
     2010-01-11    2.041076
     2010-01-12    2.041076
     2010-01-13    2.720585
     2010-01-14    2.720585
     2010-01-15    3.656987
     Freq: D

>>> d_mva[10:20]
      2010-01-11    3.131125
      2010-01-12    3.035232
      2010-01-13    2.923144
      2010-01-14    2.811055
      2010-01-15    2.785824
      Freq: D

rolling_mean函数以及其他大约十几个函数在Pandas文档中非正式地分组为“移动窗口”函数;Pandas中的第二个相关函数组被称为指数加权函数(例如ewma,计算指数移动加权平均)。这第二组函数没有包含在第一组(“移动窗口”函数)中,可能是因为指数加权变换不依赖于固定长度的窗口。


9
Pandas确实有强大的移动窗口函数,但对于简单移动平均来说,它似乎有点过于繁琐。 - Jaime
7
我觉得计算移动平均值不仅是对原帖作者有用的需求,对其他人来说也是如此。如果你需要计算移动平均值,几乎可以确定你有一个时间序列数据,这意味着你需要一个数据结构来使日期时间索引符合你的数据,这就是你所提到的“额外开销”。 - doug
3
首先,感谢您抽出时间撰写这篇非常信息丰富的答案。确实,我不能想象使用移动平均数却没有涉及时间序列的情况。但这并不意味着需要将其限制为日期时间格式,甚至特定的采样间隔(可能未知)。 - loopbackbee
3
只是想补充一下,如果 Pandas 作为依赖项过于笨重,移动平均函数已经被提取到 Bottleneck 库中。 - robochat
6
'rolling_mean'不再是pandas的一部分,请使用'rolling'代替。 - Vladtn
显示剩余4条评论

52

这里有多种方法可以做到这一点,以及一些基准测试。最好的方法是使用来自其他库的优化代码版本。 bottleneck.move_mean 方法可能是最全面的最佳选择。 scipy.convolve 方法也非常快速,可扩展,并且在语法和概念上都很简单,但是对于非常大的窗口值不具有良好的可扩展性。如果需要纯粹的numpy方法,则numpy.cumsum方法很好。

注意: 其中一些方法(例如bottleneck.move_mean)不是居中的,会改变您的数据。

import numpy as np
import scipy as sci
import scipy.signal as sig
import pandas as pd
import bottleneck as bn
import time as time

def rollavg_direct(a,n): 
    'Direct "for" loop'
    assert n%2==1
    b = a*0.0
    for i in range(len(a)) :
        b[i]=a[max(i-n//2,0):min(i+n//2+1,len(a))].mean()
    return b

def rollavg_comprehension(a,n):
    'List comprehension'
    assert n%2==1
    r,N = int(n/2),len(a)
    return np.array([a[max(i-r,0):min(i+r+1,N)].mean() for i in range(N)]) 

def rollavg_convolve(a,n):
    'scipy.convolve'
    assert n%2==1
    return sci.convolve(a,np.ones(n,dtype='float')/n, 'same')[n//2:-n//2+1]  

def rollavg_convolve_edges(a,n):
    'scipy.convolve, edge handling'
    assert n%2==1
    return sci.convolve(a,np.ones(n,dtype='float'), 'same')/sci.convolve(np.ones(len(a)),np.ones(n), 'same')  

def rollavg_cumsum(a,n):
    'numpy.cumsum'
    assert n%2==1
    cumsum_vec = np.cumsum(np.insert(a, 0, 0)) 
    return (cumsum_vec[n:] - cumsum_vec[:-n]) / n

def rollavg_cumsum_edges(a,n):
    'numpy.cumsum, edge handling'
    assert n%2==1
    N = len(a)
    cumsum_vec = np.cumsum(np.insert(np.pad(a,(n-1,n-1),'constant'), 0, 0)) 
    d = np.hstack((np.arange(n//2+1,n),np.ones(N-n)*n,np.arange(n,n//2,-1)))  
    return (cumsum_vec[n+n//2:-n//2+1] - cumsum_vec[n//2:-n-n//2]) / d

def rollavg_roll(a,n):
    'Numpy array rolling'
    assert n%2==1
    N = len(a)
    rolling_idx = np.mod((N-1)*np.arange(n)[:,None] + np.arange(N), N)
    return a[rolling_idx].mean(axis=0)[n-1:] 

def rollavg_roll_edges(a,n):
    # see https://dev59.com/pZ7ha4cB1Zd3GeqPqu_U
    'Numpy array rolling, edge handling'
    assert n%2==1
    a = np.pad(a,(0,n-1-n//2), 'constant')*np.ones(n)[:,None]
    m = a.shape[1]
    idx = np.mod((m-1)*np.arange(n)[:,None] + np.arange(m), m) # Rolling index
    out = a[np.arange(-n//2,n//2)[:,None], idx]
    d = np.hstack((np.arange(1,n),np.ones(m-2*n+1+n//2)*n,np.arange(n,n//2,-1)))
    return (out.sum(axis=0)/d)[n//2:]

def rollavg_pandas(a,n):
    'Pandas rolling average'
    return pd.DataFrame(a).rolling(n, center=True, min_periods=1).mean().to_numpy()

def rollavg_bottlneck(a,n):
    'bottleneck.move_mean'
    return bn.move_mean(a, window=n, min_count=1)

N = 10**6
a = np.random.rand(N)
functions = [rollavg_direct, rollavg_comprehension, rollavg_convolve, 
        rollavg_convolve_edges, rollavg_cumsum, rollavg_cumsum_edges, 
        rollavg_pandas, rollavg_bottlneck, rollavg_roll, rollavg_roll_edges]

print('Small window (n=3)')
%load_ext memory_profiler
for f in functions : 
    print('\n'+f.__doc__+ ' : ')
    %timeit b=f(a,3)

print('\nLarge window (n=1001)')
for f in functions[0:-2] : 
    print('\n'+f.__doc__+ ' : ')
    %timeit b=f(a,1001)

print('\nMemory\n')
print('Small window (n=3)')
N = 10**7
a = np.random.rand(N)
%load_ext memory_profiler
for f in functions[2:] : 
    print('\n'+f.__doc__+ ' : ')
    %memit b=f(a,3)

print('\nLarge window (n=1001)')
for f in functions[2:-2] : 
    print('\n'+f.__doc__+ ' : ')
    %memit b=f(a,1001)

时间,小窗口(n=3)

Direct "for" loop : 

4.14 s ± 23.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

List comprehension : 
3.96 s ± 27.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

scipy.convolve : 
1.07 ms ± 26.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

scipy.convolve, edge handling : 
4.68 ms ± 9.69 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

numpy.cumsum : 
5.31 ms ± 5.11 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

numpy.cumsum, edge handling : 
8.52 ms ± 11.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Pandas rolling average : 
9.85 ms ± 9.63 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

bottleneck.move_mean : 
1.3 ms ± 12.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Numpy array rolling : 
31.3 ms ± 91.9 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

Numpy array rolling, edge handling : 
61.1 ms ± 55.9 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

时间,大窗口(n=1001)

Direct "for" loop : 
4.67 s ± 34 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

List comprehension : 
4.46 s ± 14.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

scipy.convolve : 
103 ms ± 165 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

scipy.convolve, edge handling : 
272 ms ± 1.23 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

numpy.cumsum : 
5.19 ms ± 12.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

numpy.cumsum, edge handling : 
8.7 ms ± 11.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Pandas rolling average : 
9.67 ms ± 199 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

bottleneck.move_mean : 
1.31 ms ± 15.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

内存,小窗口(n=3)

The memory_profiler extension is already loaded. To reload it, use:
  %reload_ext memory_profiler

scipy.convolve : 
peak memory: 362.66 MiB, increment: 73.61 MiB

scipy.convolve, edge handling : 
peak memory: 510.24 MiB, increment: 221.19 MiB

numpy.cumsum : 
peak memory: 441.81 MiB, increment: 152.76 MiB

numpy.cumsum, edge handling : 
peak memory: 518.14 MiB, increment: 228.84 MiB

Pandas rolling average : 
peak memory: 449.34 MiB, increment: 160.02 MiB

bottleneck.move_mean : 
peak memory: 374.17 MiB, increment: 75.54 MiB

Numpy array rolling : 
peak memory: 661.29 MiB, increment: 362.65 MiB

Numpy array rolling, edge handling : 
peak memory: 1111.25 MiB, increment: 812.61 MiB

内存,大窗口(n=1001)

scipy.convolve : 
peak memory: 370.62 MiB, increment: 71.83 MiB

scipy.convolve, edge handling : 
peak memory: 521.98 MiB, increment: 223.18 MiB

numpy.cumsum : 
peak memory: 451.32 MiB, increment: 152.52 MiB

numpy.cumsum, edge handling : 
peak memory: 527.51 MiB, increment: 228.71 MiB

Pandas rolling average : 
peak memory: 451.25 MiB, increment: 152.50 MiB

bottleneck.move_mean : 
peak memory: 374.64 MiB, increment: 75.85 MiB

32

Numpy 1.20 开始,sliding_window_view 提供了一种滑动/滚动窗口的方式,以便遍历元素的窗口。您可以对这些窗口进行单独平均。

例如,对于一个包含 4 个元素的窗口:

from numpy.lib.stride_tricks import sliding_window_view

# values = np.array([5, 3, 8, 10, 2, 1, 5, 1, 0, 2])
np.average(sliding_window_view(values, window_shape = 4), axis=1)
# array([6.5, 5.75, 5.25, 4.5, 2.25, 1.75, 2])

请注意 sliding_window_view 的中间结果:

# values = np.array([5, 3, 8, 10, 2, 1, 5, 1, 0, 2])
sliding_window_view(values, window_shape = 4)
# array([[ 5,  3,  8, 10],
#        [ 3,  8, 10,  2],
#        [ 8, 10,  2,  1],
#        [10,  2,  1,  5],
#        [ 2,  1,  5,  1],
#        [ 1,  5,  1,  0],
#        [ 5,  1,  0,  2]])

3
请注意,虽然这种方法很简单,但它比此处列出的其他方法慢得多 - 在我的测试中,它比bottleneck.move_mean慢了约80倍,比numpy.cumsum慢了约15倍。 - Voy

11

这个使用Pandas的答案是基于上面的内容修改的,因为rolling_mean不再是Pandas的一部分。

# the recommended syntax to import pandas
import pandas as pd
import numpy as np

# prepare some fake data:
# the date-time indices:
t = pd.date_range('1/1/2010', '12/31/2012', freq='D')

# the data:
x = np.arange(0, t.shape[0])

# combine the data & index into a Pandas 'Series' object
D = pd.Series(x, t)

现在,只需在数据框上调用名为rolling的函数,并指定窗口大小即可。在下面的示例中,窗口大小为10天。
d_mva10 = D.rolling(10).mean()

# d_mva is the same size as the original Series
# though obviously the first w values are NaN where w is the window size
d_mva10[:11]

2010-01-01    NaN
2010-01-02    NaN
2010-01-03    NaN
2010-01-04    NaN
2010-01-05    NaN
2010-01-06    NaN
2010-01-07    NaN
2010-01-08    NaN
2010-01-09    NaN
2010-01-10    4.5
2010-01-11    5.5
Freq: D, dtype: float64

10

我认为可以使用瓶颈轻松解决这个问题。

以下是基本示例:

import numpy as np
import bottleneck as bn

a = np.random.randint(4, 1000, size=(5, 7))
mm = bn.move_mean(a, window=2, min_count=1)

这将在每个轴上给出移动平均值。
  • “mm”是“a”的移动平均值。

  • “window”是要考虑的最大条目数,以计算移动平均值。

  • “min_count”是要考虑的最小条目数,以计算移动平均值(例如,对于第一个元素或数组具有nan值的情况)。

好的一面是Bottleneck有助于处理nan值,并且非常高效。


瓶颈是一个不错的快速易行的解决方案(点赞),但需要强调它只提供“滞后”的运行方式,而不是居中的方式,正如argentum2f所指出的那样。 - ClimateUnboxed

7

如果有人需要一个简单的解决方案,这里有一个。

def moving_average(a,n):
    N=len(a)
    return np.array([np.mean(a[i:i+n]) for i in np.arange(0,N-n+1)])

您可以通过在 np.arange(0, N-n+1, step) 中添加步长参数来更改窗口之间的重叠。


3

如果您希望仔细处理边缘条件(仅从边缘可用元素计算平均值),则以下函数将解决问题。

import numpy as np

def running_mean(x, N):
    out = np.zeros_like(x, dtype=np.float64)
    dim_len = x.shape[0]
    for i in range(dim_len):
        if N%2 == 0:
            a, b = i - (N-1)//2, i + (N-1)//2 + 2
        else:
            a, b = i - (N-1)//2, i + (N-1)//2 + 1

        #cap indices to min and max indices
        a = max(0, a)
        b = min(dim_len, b)
        out[i] = np.mean(x[a:b])
    return out

>>> running_mean(np.array([1,2,3,4]), 2)
array([1.5, 2.5, 3.5, 4. ])

>>> running_mean(np.array([1,2,3,4]), 3)
array([1.5, 2. , 3. , 3.5])

3

所有答案似乎都集中在预先计算列表的情况上。对于实际运行的用例,其中数字逐个输入,这里提供了一个简单的类,提供了平均最后N个值的服务:

import numpy as np
class RunningAverage():
    def __init__(self, stack_size):
        self.stack = [0 for _ in range(stack_size)]
        self.ptr = 0
        self.full_cycle = False
    def add(self,value):
        self.stack[self.ptr] = value
        self.ptr += 1
        if self.ptr == len(self.stack):
            self.full_cycle = True
            self.ptr = 0
    def get_avg(self):
        if self.full_cycle:
            return np.mean(self.stack)
        else:
            return np.mean(self.stack[:self.ptr])

使用方法:

N = 50  # size of the averaging window
run_avg = RunningAverage(N)
for i in range(1000):
    value = <my computation>
    run_avg.add(value)
    if i % 20 ==0: # print once in 20 iters:
        print(f'the average value is {run_avg.get_avg()}')

2
“running”(实际上是指问题中的滚动/移动)并不是指流数据。它指的是一个沿着数据推进的移动窗口。这就是为什么答案集中在这方面的原因。 - Andras Deak -- Слава Україні
这是一个很棒的解决方案。我的论文感谢你! - Diesel

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接