似乎没有简单地在numpy/scipy上计算移动平均的函数,导致需要使用繁琐的卷积方案。
我的问题有两个:
- 使用numpy正确实现移动平均的最简单方法是什么?
- 既然这似乎非常棘手且容易出错,那么在这种情况下不采用Python标准库哲学的好理由是什么?
似乎没有简单地在numpy/scipy上计算移动平均的函数,导致需要使用繁琐的卷积方案。
我的问题有两个:
如果你只想要一个简单的非加权移动平均值,你可以使用np.cumsum
轻松实现它,这比基于FFT的方法更快:
编辑 代码中Bean发现了一个错位索引错误并进行了更正。编辑
def moving_average(a, n=3) :
ret = np.cumsum(a, dtype=float)
ret[n:] = ret[n:] - ret[:-n]
return ret[n - 1:] / n
>>> a = np.arange(20)
>>> moving_average(a)
array([ 1., 2., 3., 4., 5., 6., 7., 8., 9., 10., 11.,
12., 13., 14., 15., 16., 17., 18.])
>>> moving_average(a, n=4)
array([ 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5,
10.5, 11.5, 12.5, 13.5, 14.5, 15.5, 16.5, 17.5])
所以我想答案是:实现起来真的很容易,而且numpy也许已经有一些过于专业化的功能了。
ret[n:] -= ret[:-n]
和 ret[n:] = ret[n:] - ret[:-n]
不是相同的。我已经在这个答案中修复了代码。编辑:不,有人比我更快地解决了它。 - Timmmmnumpy.lib.stride_tricks.sliding_window_view
?解决方案变为一行代码:sliding_window_view(array, window_size).mean(axis=-1)
。速度可能与cumsum
相当,但我不确定百分之百。 - FirefoxMetzgernp.convolve
。其背后的想法是利用计算离散卷积的方式,并将其用于返回滚动均值。这可以通过与长度等于我们想要的滑动窗口长度的np.ones
序列进行卷积来实现。def moving_average(x, w):
return np.convolve(x, np.ones(w), 'valid') / w
这个函数将对序列x
和长度为w
的全1序列进行卷积。请注意,所选的mode
是valid
,因此卷积乘积仅在序列完全重叠的点处给出。
一些例子:
x = np.array([5,3,8,10,2,1,5,1,0,2])
对于窗口长度为2
的移动平均,我们将有:
moving_average(x, 2)
# array([4. , 5.5, 9. , 6. , 1.5, 3. , 3. , 0.5, 1. ])
对于长度为4
的窗口:
moving_average(x, 4)
# array([6.5 , 5.75, 5.25, 4.5 , 2.25, 1.75, 2. ])
convolve
函数?让我们更深入地了解离散卷积的计算方式。
以下函数旨在复制np.convolve
计算输出值的方式:
def mov_avg(x, w):
for m in range(len(x)-(w-1)):
yield sum(np.ones(w) * x[m:m+w]) / w
同样的例子,使用{{上面提到的方法}}也会得出:
list(mov_avg(x, 2))
# [4.0, 5.5, 9.0, 6.0, 1.5, 3.0, 3.0, 0.5, 1.0]
因此,在每个步骤中所做的是取一数组和当前窗口之间的内积。在这种情况下,由于我们直接取序列的sum
,因此乘以np.ones(w)
是多余的。
下面是一个示例,说明如何计算前几个输出,以便更清楚地理解。假设我们想要一个窗口大小为w=4
:
[1,1,1,1]
[5,3,8,10,2,1,5,1,0,2]
= (1*5 + 1*3 + 1*8 + 1*10) / w = 6.5
下面的输出将被计算为:
[1,1,1,1]
[5,3,8,10,2,1,5,1,0,2]
= (1*3 + 1*8 + 1*10 + 1*2) / w = 5.75
等等,一旦所有重叠都完成,返回序列的移动平均值。
mode='valid'
替换为'same'
。但在这种情况下,边缘点会趋向于零。 - Ilia Barahovsky>>> # the recommended syntax to import pandas
>>> import pandas as PD
>>> import numpy as NP
>>> # prepare some fake data:
>>> # the date-time indices:
>>> t = PD.date_range('1/1/2010', '12/31/2012', freq='D')
>>> # the data:
>>> x = NP.arange(0, t.shape[0])
>>> # combine the data & index into a Pandas 'Series' object
>>> D = PD.Series(x, t)
现在,只需调用函数rolling_mean并传入Series对象和窗口大小,在下面的示例中为10天。
>>> d_mva = PD.rolling_mean(D, 10)
>>> # d_mva is the same size as the original Series
>>> d_mva.shape
(1096,)
>>> # though obviously the first w values are NaN where w is the window size
>>> d_mva[:3]
2010-01-01 NaN
2010-01-02 NaN
2010-01-03 NaN
验证它是否有效——例如,比较原始系列中10-15的值与使用滚动均值平滑后的新系列
>>> D[10:15]
2010-01-11 2.041076
2010-01-12 2.041076
2010-01-13 2.720585
2010-01-14 2.720585
2010-01-15 3.656987
Freq: D
>>> d_mva[10:20]
2010-01-11 3.131125
2010-01-12 3.035232
2010-01-13 2.923144
2010-01-14 2.811055
2010-01-15 2.785824
Freq: D
rolling_mean函数以及其他大约十几个函数在Pandas文档中非正式地分组为“移动窗口”函数;Pandas中的第二个相关函数组被称为指数加权函数(例如ewma,计算指数移动加权平均)。这第二组函数没有包含在第一组(“移动窗口”函数)中,可能是因为指数加权变换不依赖于固定长度的窗口。
这里有多种方法可以做到这一点,以及一些基准测试。最好的方法是使用来自其他库的优化代码版本。 bottleneck.move_mean
方法可能是最全面的最佳选择。 scipy.convolve
方法也非常快速,可扩展,并且在语法和概念上都很简单,但是对于非常大的窗口值不具有良好的可扩展性。如果需要纯粹的numpy
方法,则numpy.cumsum
方法很好。
注意: 其中一些方法(例如bottleneck.move_mean
)不是居中的,会改变您的数据。
import numpy as np
import scipy as sci
import scipy.signal as sig
import pandas as pd
import bottleneck as bn
import time as time
def rollavg_direct(a,n):
'Direct "for" loop'
assert n%2==1
b = a*0.0
for i in range(len(a)) :
b[i]=a[max(i-n//2,0):min(i+n//2+1,len(a))].mean()
return b
def rollavg_comprehension(a,n):
'List comprehension'
assert n%2==1
r,N = int(n/2),len(a)
return np.array([a[max(i-r,0):min(i+r+1,N)].mean() for i in range(N)])
def rollavg_convolve(a,n):
'scipy.convolve'
assert n%2==1
return sci.convolve(a,np.ones(n,dtype='float')/n, 'same')[n//2:-n//2+1]
def rollavg_convolve_edges(a,n):
'scipy.convolve, edge handling'
assert n%2==1
return sci.convolve(a,np.ones(n,dtype='float'), 'same')/sci.convolve(np.ones(len(a)),np.ones(n), 'same')
def rollavg_cumsum(a,n):
'numpy.cumsum'
assert n%2==1
cumsum_vec = np.cumsum(np.insert(a, 0, 0))
return (cumsum_vec[n:] - cumsum_vec[:-n]) / n
def rollavg_cumsum_edges(a,n):
'numpy.cumsum, edge handling'
assert n%2==1
N = len(a)
cumsum_vec = np.cumsum(np.insert(np.pad(a,(n-1,n-1),'constant'), 0, 0))
d = np.hstack((np.arange(n//2+1,n),np.ones(N-n)*n,np.arange(n,n//2,-1)))
return (cumsum_vec[n+n//2:-n//2+1] - cumsum_vec[n//2:-n-n//2]) / d
def rollavg_roll(a,n):
'Numpy array rolling'
assert n%2==1
N = len(a)
rolling_idx = np.mod((N-1)*np.arange(n)[:,None] + np.arange(N), N)
return a[rolling_idx].mean(axis=0)[n-1:]
def rollavg_roll_edges(a,n):
# see https://dev59.com/pZ7ha4cB1Zd3GeqPqu_U
'Numpy array rolling, edge handling'
assert n%2==1
a = np.pad(a,(0,n-1-n//2), 'constant')*np.ones(n)[:,None]
m = a.shape[1]
idx = np.mod((m-1)*np.arange(n)[:,None] + np.arange(m), m) # Rolling index
out = a[np.arange(-n//2,n//2)[:,None], idx]
d = np.hstack((np.arange(1,n),np.ones(m-2*n+1+n//2)*n,np.arange(n,n//2,-1)))
return (out.sum(axis=0)/d)[n//2:]
def rollavg_pandas(a,n):
'Pandas rolling average'
return pd.DataFrame(a).rolling(n, center=True, min_periods=1).mean().to_numpy()
def rollavg_bottlneck(a,n):
'bottleneck.move_mean'
return bn.move_mean(a, window=n, min_count=1)
N = 10**6
a = np.random.rand(N)
functions = [rollavg_direct, rollavg_comprehension, rollavg_convolve,
rollavg_convolve_edges, rollavg_cumsum, rollavg_cumsum_edges,
rollavg_pandas, rollavg_bottlneck, rollavg_roll, rollavg_roll_edges]
print('Small window (n=3)')
%load_ext memory_profiler
for f in functions :
print('\n'+f.__doc__+ ' : ')
%timeit b=f(a,3)
print('\nLarge window (n=1001)')
for f in functions[0:-2] :
print('\n'+f.__doc__+ ' : ')
%timeit b=f(a,1001)
print('\nMemory\n')
print('Small window (n=3)')
N = 10**7
a = np.random.rand(N)
%load_ext memory_profiler
for f in functions[2:] :
print('\n'+f.__doc__+ ' : ')
%memit b=f(a,3)
print('\nLarge window (n=1001)')
for f in functions[2:-2] :
print('\n'+f.__doc__+ ' : ')
%memit b=f(a,1001)
时间,小窗口(n=3)
Direct "for" loop :
4.14 s ± 23.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
List comprehension :
3.96 s ± 27.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
scipy.convolve :
1.07 ms ± 26.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
scipy.convolve, edge handling :
4.68 ms ± 9.69 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
numpy.cumsum :
5.31 ms ± 5.11 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
numpy.cumsum, edge handling :
8.52 ms ± 11.1 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Pandas rolling average :
9.85 ms ± 9.63 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
bottleneck.move_mean :
1.3 ms ± 12.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Numpy array rolling :
31.3 ms ± 91.9 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Numpy array rolling, edge handling :
61.1 ms ± 55.9 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
时间,大窗口(n=1001)
Direct "for" loop :
4.67 s ± 34 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
List comprehension :
4.46 s ± 14.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
scipy.convolve :
103 ms ± 165 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
scipy.convolve, edge handling :
272 ms ± 1.23 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
numpy.cumsum :
5.19 ms ± 12.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
numpy.cumsum, edge handling :
8.7 ms ± 11.5 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Pandas rolling average :
9.67 ms ± 199 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
bottleneck.move_mean :
1.31 ms ± 15.7 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
内存,小窗口(n=3)
The memory_profiler extension is already loaded. To reload it, use:
%reload_ext memory_profiler
scipy.convolve :
peak memory: 362.66 MiB, increment: 73.61 MiB
scipy.convolve, edge handling :
peak memory: 510.24 MiB, increment: 221.19 MiB
numpy.cumsum :
peak memory: 441.81 MiB, increment: 152.76 MiB
numpy.cumsum, edge handling :
peak memory: 518.14 MiB, increment: 228.84 MiB
Pandas rolling average :
peak memory: 449.34 MiB, increment: 160.02 MiB
bottleneck.move_mean :
peak memory: 374.17 MiB, increment: 75.54 MiB
Numpy array rolling :
peak memory: 661.29 MiB, increment: 362.65 MiB
Numpy array rolling, edge handling :
peak memory: 1111.25 MiB, increment: 812.61 MiB
内存,大窗口(n=1001)
scipy.convolve :
peak memory: 370.62 MiB, increment: 71.83 MiB
scipy.convolve, edge handling :
peak memory: 521.98 MiB, increment: 223.18 MiB
numpy.cumsum :
peak memory: 451.32 MiB, increment: 152.52 MiB
numpy.cumsum, edge handling :
peak memory: 527.51 MiB, increment: 228.71 MiB
Pandas rolling average :
peak memory: 451.25 MiB, increment: 152.50 MiB
bottleneck.move_mean :
peak memory: 374.64 MiB, increment: 75.85 MiB
从 Numpy 1.20
开始,sliding_window_view
提供了一种滑动/滚动窗口的方式,以便遍历元素的窗口。您可以对这些窗口进行单独平均。
例如,对于一个包含 4
个元素的窗口:
from numpy.lib.stride_tricks import sliding_window_view
# values = np.array([5, 3, 8, 10, 2, 1, 5, 1, 0, 2])
np.average(sliding_window_view(values, window_shape = 4), axis=1)
# array([6.5, 5.75, 5.25, 4.5, 2.25, 1.75, 2])
请注意 sliding_window_view
的中间结果:
# values = np.array([5, 3, 8, 10, 2, 1, 5, 1, 0, 2])
sliding_window_view(values, window_shape = 4)
# array([[ 5, 3, 8, 10],
# [ 3, 8, 10, 2],
# [ 8, 10, 2, 1],
# [10, 2, 1, 5],
# [ 2, 1, 5, 1],
# [ 1, 5, 1, 0],
# [ 5, 1, 0, 2]])
这个使用Pandas的答案是基于上面的内容修改的,因为rolling_mean
不再是Pandas的一部分。
# the recommended syntax to import pandas
import pandas as pd
import numpy as np
# prepare some fake data:
# the date-time indices:
t = pd.date_range('1/1/2010', '12/31/2012', freq='D')
# the data:
x = np.arange(0, t.shape[0])
# combine the data & index into a Pandas 'Series' object
D = pd.Series(x, t)
rolling
的函数,并指定窗口大小即可。在下面的示例中,窗口大小为10天。d_mva10 = D.rolling(10).mean()
# d_mva is the same size as the original Series
# though obviously the first w values are NaN where w is the window size
d_mva10[:11]
2010-01-01 NaN
2010-01-02 NaN
2010-01-03 NaN
2010-01-04 NaN
2010-01-05 NaN
2010-01-06 NaN
2010-01-07 NaN
2010-01-08 NaN
2010-01-09 NaN
2010-01-10 4.5
2010-01-11 5.5
Freq: D, dtype: float64
我认为可以使用瓶颈轻松解决这个问题。
以下是基本示例:
import numpy as np
import bottleneck as bn
a = np.random.randint(4, 1000, size=(5, 7))
mm = bn.move_mean(a, window=2, min_count=1)
“mm”是“a”的移动平均值。
“window”是要考虑的最大条目数,以计算移动平均值。
“min_count”是要考虑的最小条目数,以计算移动平均值(例如,对于第一个元素或数组具有nan值的情况)。
好的一面是Bottleneck有助于处理nan值,并且非常高效。
如果有人需要一个简单的解决方案,这里有一个。
def moving_average(a,n):
N=len(a)
return np.array([np.mean(a[i:i+n]) for i in np.arange(0,N-n+1)])
您可以通过在 np.arange(0, N-n+1, step)
中添加步长参数来更改窗口之间的重叠。
如果您希望仔细处理边缘条件(仅从边缘可用元素计算平均值),则以下函数将解决问题。
import numpy as np
def running_mean(x, N):
out = np.zeros_like(x, dtype=np.float64)
dim_len = x.shape[0]
for i in range(dim_len):
if N%2 == 0:
a, b = i - (N-1)//2, i + (N-1)//2 + 2
else:
a, b = i - (N-1)//2, i + (N-1)//2 + 1
#cap indices to min and max indices
a = max(0, a)
b = min(dim_len, b)
out[i] = np.mean(x[a:b])
return out
>>> running_mean(np.array([1,2,3,4]), 2)
array([1.5, 2.5, 3.5, 4. ])
>>> running_mean(np.array([1,2,3,4]), 3)
array([1.5, 2. , 3. , 3.5])
所有答案似乎都集中在预先计算列表的情况上。对于实际运行的用例,其中数字逐个输入,这里提供了一个简单的类,提供了平均最后N个值的服务:
import numpy as np
class RunningAverage():
def __init__(self, stack_size):
self.stack = [0 for _ in range(stack_size)]
self.ptr = 0
self.full_cycle = False
def add(self,value):
self.stack[self.ptr] = value
self.ptr += 1
if self.ptr == len(self.stack):
self.full_cycle = True
self.ptr = 0
def get_avg(self):
if self.full_cycle:
return np.mean(self.stack)
else:
return np.mean(self.stack[:self.ptr])
使用方法:
N = 50 # size of the averaging window
run_avg = RunningAverage(N)
for i in range(1000):
value = <my computation>
run_avg.add(value)
if i % 20 ==0: # print once in 20 iters:
print(f'the average value is {run_avg.get_avg()}')