如何在Python 3中计算移动平均值?

11

假设我有一个列表:

y = ['1', '2', '3', '4','5','6','7','8','9','10']
我想创建一个函数来计算移动n日平均值。例如,如果n为5,我希望我的代码计算前1-5天的和并找到平均值,这将是3.0,然后继续计算2-6天,找到平均值,这将是4.0,然后是3-7,4-8,5-9和6-10天。
我不想计算前n-1天,所以从第n天开始,它将计算前面的天数。
def moving_average(x:'list of prices', n):
    for num in range(len(x)+1):
        print(x[num-n:num])

看起来这正是我想要的打印结果:

[]
[]
[]
[]
[]

['1', '2', '3', '4', '5']

['2', '3', '4', '5', '6']

['3', '4', '5', '6', '7']

['4', '5', '6', '7', '8']

['5', '6', '7', '8', '9']

['6', '7', '8', '9', '10']

然而,我不知道如何计算那些列表中的数字。有任何想法吗?


6
为什么列表中有字符串而不是数字? - Lev Levitsky
5个回答

23

旧版Python文档中有一个非常好用的滑动窗口生成器,它包含在itertools示例中

from itertools import islice

def window(seq, n=2):
    "Returns a sliding window (of width n) over data from the iterable"
    "   s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...                   "
    it = iter(seq)
    result = tuple(islice(it, n))
    if len(result) == n:
        yield result    
    for elem in it:
        result = result[1:] + (elem,)
        yield result

使用移动平均值非常简单:

from __future__ import division  # For Python 2

def moving_averages(values, size):
    for selection in window(values, size):
        yield sum(selection) / size

将输入数据映射为整数后,对其运行此操作,结果如下所示:
>>> y= ['1', '2', '3', '4','5','6','7','8','9','10']
>>> for avg in moving_averages(map(int, y), 5):
...     print(avg)
... 
3.0
4.0
5.0
6.0
7.0
8.0

为了让“不完整”集合的前n-1次迭代返回None,只需稍微扩展moving_averages函数:
def moving_averages(values, size):
    for _ in range(size - 1):
        yield None
    for selection in window(values, size):
        yield sum(selection) / size

1
我希望结果是[无,无,无,无,3.0,4.0,5.0,6.0,7.0,8.0]。 - Kara
虽然我欣赏你优雅的解决方案,但我将其与一种跟踪运行总和而不是多次重新计算总和的解决方案进行了比较。请参见我的答案。如果该函数简化为仅回答原始问题并且不允许附加参数,则可能会更快。 - cfi

7

虽然我喜欢Martijn的回答,但像george一样,我想知道是否可以通过使用运行总和而不是在大多数相同数字上反复应用sum()来加快速度。

此外,在斜坡上升阶段默认使用None值的想法很有趣。实际上,可能会有许多不同的情况可以构思移动平均值。让我们将平均数的计算分成三个阶段:

  1. 斜坡上升:当前迭代次数<窗口大小的起始迭代
  2. 稳步进展:我们有正好窗口大小的元素可用于计算正常的average := sum(x[iteration_counter-window_size:iteration_counter])/window_size
  3. 斜坡下降:在输入数据的末尾,我们可以返回另一个window_size - 1个“平均”数字。

这是一个接受以下内容的函数:

  • 任意可迭代对象(生成器也可以)作为数据输入
  • 任意窗口大小>=1
  • 在斜坡上升/下降期间打开/关闭值的生产的参数
  • 回调函数用于控制如何生成值。这可以用于不断提供默认值(例如None)或提供部分平均值

以下是代码:

from collections import deque 

def moving_averages(data, size, rampUp=True, rampDown=True):
    """Slide a window of <size> elements over <data> to calc an average

    First and last <size-1> iterations when window is not yet completely
    filled with data, or the window empties due to exhausted <data>, the
    average is computed with just the available data (but still divided
    by <size>).
    Set rampUp/rampDown to False in order to not provide any values during
    those start and end <size-1> iterations.
    Set rampUp/rampDown to functions to provide arbitrary partial average
    numbers during those phases. The callback will get the currently
    available input data in a deque. Do not modify that data.
    """
    d = deque()
    running_sum = 0.0

    data = iter(data)
    # rampUp
    for count in range(1, size):
        try:
            val = next(data)
        except StopIteration:
            break
        running_sum += val
        d.append(val)
        #print("up: running sum:" + str(running_sum) + "  count: " + str(count) + "  deque: " + str(d))
        if rampUp:
            if callable(rampUp):
                yield rampUp(d)
            else:
                yield running_sum / size

    # steady
    exhausted_early = True
    for val in data:
        exhausted_early = False
        running_sum += val
        #print("st: running sum:" + str(running_sum) + "  deque: " + str(d))
        yield running_sum / size
        d.append(val)
        running_sum -= d.popleft()

    # rampDown
    if rampDown:
        if exhausted_early:
            running_sum -= d.popleft()
        for (count) in range(min(len(d), size-1), 0, -1):
            #print("dn: running sum:" + str(running_sum) + "  deque: " + str(d))
            if callable(rampDown):
                yield rampDown(d)
            else:
                yield running_sum / size
            running_sum -= d.popleft()

这个版本似乎比Martijn的版本快一点 - 虽然Martijn的版本更加优雅。这是测试代码:

print("")
print("Timeit")
print("-" * 80)

from itertools import islice
def window(seq, n=2):
    "Returns a sliding window (of width n) over data from the iterable"
    "   s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...                   "
    it = iter(seq)
    result = tuple(islice(it, n))
    if len(result) == n:
        yield result    
    for elem in it:
        result = result[1:] + (elem,)
        yield result

# Martijn's version:
def moving_averages_SO(values, size):
    for selection in window(values, size):
        yield sum(selection) / size


import timeit
problems = [int(i) for i in (10, 100, 1000, 10000, 1e5, 1e6, 1e7)]
for problem_size in problems:
    print("{:12s}".format(str(problem_size)), end="")

    so = timeit.repeat("list(moving_averages_SO(range("+str(problem_size)+"), 5))", number=1*max(problems)//problem_size,
                       setup="from __main__ import moving_averages_SO")
    print("{:12.3f} ".format(min(so)), end="")

    my = timeit.repeat("list(moving_averages(range("+str(problem_size)+"), 5, False, False))", number=1*max(problems)//problem_size,
                       setup="from __main__ import moving_averages")
    print("{:12.3f} ".format(min(my)), end="")

    print("")

输出结果:

Timeit
--------------------------------------------------------------------------------
10                 7.242        7.656 
100                5.816        5.500 
1000               5.787        5.244 
10000              5.782        5.180 
100000             5.746        5.137 
1000000            5.745        5.198 
10000000           5.764        5.186 

现在可以通过这个函数调用来解决原始问题:

print(list(moving_averages(range(1,11), 5,
                           rampUp=lambda _: None,
                           rampDown=False)))

输出结果:
[None, None, None, None, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]

2
避免重新计算中间总和的方法。
list=range(0,12)
def runs(v):
 global runningsum
 runningsum+=v
 return(runningsum)
runningsum=0
runsumlist=[ runs(v) for v in list ]
result = [ (runsumlist[k] - runsumlist[k-5])/5 for k in range(0,len(list)+1)]

打印结果

[2,3,4,5,6,7,8,9]

将运行(int(v))的结果转换为字符串,然后计算(runsumlist[k] - runsumlist[k-5])/5的值。

如果你想在程序中使用数字和字符串,请使用Alt方法而不是全局变量。


list = [float[x] for x in range(0,12)]
nave = 5
movingave = sum(list[:nave]/nave)
for i in range(len(list)-nave):movingave.append(movingave[-1]+(list[i+nave]-list[i])/nave)
print movingave 

即使您输入的值是整数,也要进行浮点数计算。

[2.0,3.0,4.0,5.0,6.0,7.0,8.0,9,0]

1
确实,一个运行总和算法更快。我已经发布了一个回答证明了你的观点。这里根本不需要使用global变量。 - cfi

1
使用summap函数。
print(sum(map(int, x[num-n:num])))

Python 3中的map函数基本上是这个函数的惰性版本:
[int(i) for i in x[num-n:num]]

我相信你能猜出 sum 函数的作用。


0

还有另一个解决方案,可以扩展 itertools 中的配对函数 pairwise()。您可以将其扩展为 nwise(),这将提供滑动窗口功能(并且在可迭代对象是生成器的情况下也适用):

def nwise(iterable, n):
    ts = it.tee(iterable, n)
    for c, t in enumerate(ts):
        next(it.islice(t, c, c), None)
    return zip(*ts)

def moving_averages_nw(iterable, n):
    yield from (sum(x)/n for x in nwise(iterable, n))

>>> list(moving_averages_nw(range(1, 11), 5))
[3.0, 4.0, 5.0, 6.0, 7.0, 8.0]

虽然对于短的iterable而言,设置成本相对较高,但随着数据集越长,这种成本的影响会减少。这里使用了sum()函数,代码相当优雅:

Timeit              MP           cfi         *****
--------------------------------------------------------------------------------
10                 4.658        4.959        7.351 
100                5.144        4.070        4.234 
1000               5.312        4.020        3.977 
10000              5.317        4.031        3.966 
100000             5.508        4.115        4.087 
1000000            5.526        4.263        4.202 
10000000           5.632        4.326        4.242 

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接