移动平均或滑动平均

298

是否有适用于 Python 的 SciPy 函数、NumPy 函数或模块可以计算给定特定窗口的一维数组的滑动平均值?


1
请注意,如果您在线构建数组,则问题陈述实际上变成了“如何最有效地维护一个向量,在末尾添加值并在开头弹出值”,因为您可以简单地维护一个平均值的单个累加器,每次输入一个值时添加新值并减去最旧的值,这在复杂度上是微不足道的。 - BjornW
以下除一条外,所有答案都没有回答问题:如何在添加新值时更新移动平均数,即“running”。我建议使用循环缓冲区,这样您就不必经常调整其大小,并且通过计算下一个平均值并了解上一个平均值和新值,可以更新下一个索引(模缓冲区大小)。简单的代数重排即可实现。 - D Left Adjoint to U
30个回答

4
这个问题比NeXuS上个月写的时候还要老,但我喜欢他的代码如何处理边缘情况。然而,由于它是“简单移动平均”,其结果滞后于应用于它们的数据。我认为,通过将类似的方法应用于基于卷积的convolution()方法,可以以比NumPy的模式validsamefull更令人满意的方式处理边缘情况。
我的贡献使用中心运行平均值来对齐其结果与它们的数据。当可用点数太少无法使用完整大小的窗口时,从数组边缘连续较小的窗口计算运行平均值。[实际上,是从连续较大的窗口计算,但这是一个实现细节。]
import numpy as np

def running_mean(l, N):
    # Also works for the(strictly invalid) cases when N is even.
    if (N//2)*2 == N:
        N = N - 1
    front = np.zeros(N//2)
    back = np.zeros(N//2)

    for i in range(1, (N//2)*2, 2):
        front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = 'valid')
    for i in range(1, (N//2)*2, 2):
        back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = 'valid')
    return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = 'valid'), back[::-1]])

由于使用了convolve(),所以它比较慢,如果有真正的Python专家来优化它,那么它很可能会变得更好,然而,我认为这个想法是可行的。


3

只使用Python标准库(占用内存更少)

这里提供另一种仅使用标准库deque的版本。让我惊讶的是,大多数答案都在使用pandasnumpy

def moving_average(iterable, n=3):
    d = deque(maxlen=n)
    for i in iterable:
        d.append(i)
        if len(d) == n:
            yield sum(d)/n

r = moving_average([40, 30, 50, 46, 39, 44])
assert list(r) == [40.0, 42.0, 45.0, 43.0]

实际上,我在Python文档中找到了另一种实现方式

def moving_average(iterable, n=3):
    # moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable)
    d = deque(itertools.islice(it, n-1))
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

然而,实现似乎比我预期的要复杂一些。但是标准Python文档中一定有它存在的原因,请问有人能够对我的实现和标准文档进行评论吗?


3
你每次迭代时都对窗口成员求和,而他们能够高效地更新和(删除一个成员并添加另一个成员)是一个很大的区别。就计算复杂度而言,你的方法需要进行 O(n*d) 次计算(其中 d 是窗口的大小,n 是可迭代对象的大小),而他们只需要进行 O(n) 次计算。 - Iftah
@Iftah,很好,谢谢你的解释,你是正确的。 - MaThMaX

3

从阅读其他答案来看,我不认为这是问题所问的,但我需要保持一个正在增长的值列表的运行平均值。

因此,如果您想保留从某个地方(网站、测量设备等)获取的值列表,并更新最后n个值的平均值,您可以使用下面的代码,这将最小化添加新元素的工作量:

class Running_Average(object):
    def __init__(self, buffer_size=10):
        """
        Create a new Running_Average object.

        This object allows the efficient calculation of the average of the last
        `buffer_size` numbers added to it.

        Examples
        --------
        >>> a = Running_Average(2)
        >>> a.add(1)
        >>> a.get()
        1.0
        >>> a.add(1)  # there are two 1 in buffer
        >>> a.get()
        1.0
        >>> a.add(2)  # there's a 1 and a 2 in the buffer
        >>> a.get()
        1.5
        >>> a.add(2)
        >>> a.get()  # now there's only two 2 in the buffer
        2.0
        """
        self._buffer_size = int(buffer_size)  # make sure it's an int
        self.reset()

    def add(self, new):
        """
        Add a new number to the buffer, or replaces the oldest one there.
        """
        new = float(new)  # make sure it's a float
        n = len(self._buffer)
        if n < self.buffer_size:  # still have to had numbers to the buffer.
            self._buffer.append(new)
            if self._average != self._average:  # ~ if isNaN().
                self._average = new  # no previous numbers, so it's new.
            else:
                self._average *= n  # so it's only the sum of numbers.
                self._average += new  # add new number.
                self._average /= (n+1)  # divide by new number of numbers.
        else:  # buffer full, replace oldest value.
            old = self._buffer[self._index]  # the previous oldest number.
            self._buffer[self._index] = new  # replace with new one.
            self._index += 1  # update the index and make sure it's...
            self._index %= self.buffer_size  # ... smaller than buffer_size.
            self._average -= old/self.buffer_size  # remove old one...
            self._average += new/self.buffer_size  # ...and add new one...
            # ... weighted by the number of elements.

    def __call__(self):
        """
        Return the moving average value, for the lazy ones who don't want
        to write .get .
        """
        return self._average

    def get(self):
        """
        Return the moving average value.
        """
        return self()

    def reset(self):
        """
        Reset the moving average.

        If for some reason you don't want to just create a new one.
        """
        self._buffer = []  # could use np.empty(self.buffer_size)...
        self._index = 0  # and use this to keep track of how many numbers.
        self._average = float('nan')  # could use np.NaN .

    def get_buffer_size(self):
        """
        Return current buffer_size.
        """
        return self._buffer_size

    def set_buffer_size(self, buffer_size):
        """
        >>> a = Running_Average(10)
        >>> for i in range(15):
        ...     a.add(i)
        ...
        >>> a()
        9.5
        >>> a._buffer  # should not access this!!
        [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]

        Decreasing buffer size:
        >>> a.buffer_size = 6
        >>> a._buffer  # should not access this!!
        [9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
        >>> a.buffer_size = 2
        >>> a._buffer
        [13.0, 14.0]

        Increasing buffer size:
        >>> a.buffer_size = 5
        Warning: no older data available!
        >>> a._buffer
        [13.0, 14.0]

        Keeping buffer size:
        >>> a = Running_Average(10)
        >>> for i in range(15):
        ...     a.add(i)
        ...
        >>> a()
        9.5
        >>> a._buffer  # should not access this!!
        [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0]
        >>> a.buffer_size = 10  # reorders buffer!
        >>> a._buffer
        [5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0]
        """
        buffer_size = int(buffer_size)
        # order the buffer so index is zero again:
        new_buffer = self._buffer[self._index:]
        new_buffer.extend(self._buffer[:self._index])
        self._index = 0
        if self._buffer_size < buffer_size:
            print('Warning: no older data available!')  # should use Warnings!
        else:
            diff = self._buffer_size - buffer_size
            print(diff)
            new_buffer = new_buffer[diff:]
        self._buffer_size = buffer_size
        self._buffer = new_buffer

    buffer_size = property(get_buffer_size, set_buffer_size)

你可以使用以下示例进行测试:

def graph_test(N=200):
    import matplotlib.pyplot as plt
    values = list(range(N))
    values_average_calculator = Running_Average(N/2)
    values_averages = []
    for value in values:
        values_average_calculator.add(value)
        values_averages.append(values_average_calculator())
    fig, ax = plt.subplots(1, 1)
    ax.plot(values, label='values')
    ax.plot(values_averages, label='averages')
    ax.grid()
    ax.set_xlim(0, N)
    ax.set_ylim(0, N)
    fig.show()

这给出了:

作为值的函数的值和平均值


3

移动平均滤波器怎么样?它也是一行代码,有一个优点,就是如果你需要的不是矩形窗口,而是N个简单移动平均值的数组a,那么你可以很容易地操纵窗口类型:

lfilter(np.ones(N)/N, [1], a)[N:]

应用三角形窗口后:

lfilter(np.ones(N)*scipy.signal.triang(N)/N, [1], a)[N:]

注意:通常我会将前N个样本丢弃,因此在末尾加上[N:],但这并非必要,只是个人选择的问题。

3

为了教育目的,让我添加两个Numpy解决方案(比cumsum解决方案慢):

import numpy as np
from numpy.lib.stride_tricks import as_strided

def ra_strides(arr, window):
    ''' Running average using as_strided'''
    n = arr.shape[0] - window + 1
    arr_strided = as_strided(arr, shape=[n, window], strides=2*arr.strides)
    return arr_strided.mean(axis=1)

def ra_add(arr, window):
    ''' Running average using add.reduceat'''
    n = arr.shape[0] - window + 1
    indices = np.array([0, window]*n) + np.repeat(np.arange(n), 2)
    arr = np.append(arr, 0)
    return np.add.reduceat(arr, indices )[::2]/window

Functions used: as_strided, add.reduceat


2

我的解决方案基于维基百科上的“简单移动平均线”。

from numba import jit
@jit
def sma(x, N):
    s = np.zeros_like(x)
    k = 1 / N
    s[0] = x[0] * k
    for i in range(1, N + 1):
        s[i] = s[i - 1] + x[i] * k
    for i in range(N, x.shape[0]):
        s[i] = s[i - 1] + (x[i] - x[i - N]) * k
    s = s[N - 1:]
    return s

与先前提出的解决方案相比,它比scipy中最快的解决方案“uniform_filter1d”快两倍,并且具有相同的误差顺序。 速度测试:
import numpy as np    
x = np.random.random(10000000)
N = 1000

from scipy.ndimage.filters import uniform_filter1d
%timeit uniform_filter1d(x, size=N)
95.7 ms ± 9.34 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit sma(x, N)
47.3 ms ± 3.42 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

错误比较:

np.max(np.abs(np.convolve(x, np.ones((N,))/N, mode='valid') - uniform_filter1d(x, size=N, mode='constant', origin=-(N//2))[:-(N-1)]))
8.604228440844963e-14
np.max(np.abs(np.convolve(x, np.ones((N,))/N, mode='valid') - sma(x, N)))
1.41886502547095e-13

1

使用标准库和 deque 的另一种解决方案:

from collections import deque
import itertools

def moving_average(iterable, n=3):
    # http://en.wikipedia.org/wiki/Moving_average
    it = iter(iterable) 
    # create an iterable object from input argument
    d = deque(itertools.islice(it, n-1))  
    # create deque object by slicing iterable
    d.appendleft(0)
    s = sum(d)
    for elem in it:
        s += elem - d.popleft()
        d.append(elem)
        yield s / n

# example on how to use it
for i in  moving_average([40, 30, 50, 46, 39, 44]):
    print(i)

# 40.0
# 42.0
# 45.0
# 43.0

这段内容摘自Python collections.deque 文档 - Intrastellar Explorer

1

虽然这里已经有了解决方案,但请看看我的解决方案。它非常简单且运行良好。

import numpy as np
dataset = np.asarray([1, 2, 3, 4, 5, 6, 7])
ma = list()
window = 3
for t in range(0, len(dataset)):
    if t+window <= len(dataset):
        indices = range(t, t+window)
        ma.append(np.average(np.take(dataset, indices)))
else:
    ma = np.asarray(ma)

0
如果你需要对非常小的数组(少于200个元素)进行重复操作,我发现最快的结果是使用线性代数。最慢的部分是设置乘法矩阵y,这只需要做一次,但之后可能会更快。
import numpy as np
import random 

N = 100      # window size
size =200     # array length

x = np.random.random(size)
y = np.eye(size, dtype=float)

# prepare matrix
for i in range(size):
  y[i,i:i+N] = 1./N
  
# calculate running mean
z = np.inner(x,y.T)[N-1:]


-7

如果您选择自己编写代码而不是使用现有的库,请注意浮点误差并尽量将其影响降至最小:

class SumAccumulator:
    def __init__(self):
        self.values = [0]
        self.count = 0

    def add( self, val ):
        self.values.append( val )
        self.count = self.count + 1
        i = self.count
        while i & 0x01:
            i = i >> 1
            v0 = self.values.pop()
            v1 = self.values.pop()
            self.values.append( v0 + v1 )

    def get_total(self):
        return sum( reversed(self.values) )

    def get_size( self ):
        return self.count

如果您的所有值大致相同,那么这将有助于通过始终添加大致相似数量级的值来保持精度。

16
这个回答非常不清晰,至少在代码中加上一些注释或解释为什么这有助于浮点数误差会更好。 - Gabe
  1. 如果应用于原始问题,这将非常慢(计算平均值),因此这是不相关的。
  2. 要遭受64位数字精度问题的困扰,必须对近似相等的数字进行2^30次求和。
- Alleo
@Alleo:不是每个值做一次加法,而是做两次。证明与位翻转问题相同。然而,这个答案的重点不一定是性能,而是精度。平均64位值的内存使用量不会超过缓存中的64个元素,因此在内存使用方面也很友好。 - Mayur Patel
是的,你说得对,这比简单求和需要多2倍的操作,但原始问题是计算__滑动平均值__,而不仅仅是求和。可以用O(n)完成,但你的答案需要O(mn),其中m是窗口大小。 - Alleo
@Alleo:我还不清楚你的分析。从总和到平均值很容易,所以我们忽略它。当你在位置x有总和时,那么通过减去窗口中最左边的值并将新值添加到右边,你可以得到位置x+1处的总和。因此,如果您想要每个窗口N宽度上的m个相邻值的平均值,则使用我提供的代码应该花费O(N + m)。 - Mayur Patel
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接