在Python中计算指数移动平均值

33

我有一组日期和每个日期的度量值。 我想为每个日期计算指数移动平均值。 有人知道如何做吗?

我是Python新手。 标准Python库似乎没有内置平均函数,这让我感到有些奇怪。 或许我没有找对地方。

因此,鉴于以下代码,我该如何计算日历日期智商点数的移动加权平均值?

from datetime import date
days = [date(2008,1,1), date(2008,1,2), date(2008,1,7)]
IQ = [110, 105, 90]

(可能有更好的方式来构建这个数据,任何建议都将不胜感激)

16个回答

23

编辑: 似乎来自scikits.timeseries.lib.moving_funcs子模块的mov_average_expw()函数,来自SciKits(补充了SciPy的附加工具包),更符合您问题的措辞。


要使用平滑系数alpha(在维基百科的术语中为(1-alpha))计算数据的指数平滑

>>> alpha = 0.5
>>> assert 0 < alpha <= 1.0
>>> av = sum(alpha**n.days * iq 
...     for n, iq in map(lambda (day, iq), today=max(days): (today-day, iq), 
...         sorted(zip(days, IQ), key=lambda p: p[0], reverse=True)))
95.0

以上代码看起来不太美观,我们对其进行一些重构:

from collections import namedtuple
from operator    import itemgetter

def smooth(iq_data, alpha=1, today=None):
    """Perform exponential smoothing with factor `alpha`.

    Time period is a day.
    Each time period the value of `iq` drops `alpha` times.
    The most recent data is the most valuable one.
    """
    assert 0 < alpha <= 1

    if alpha == 1: # no smoothing
        return sum(map(itemgetter(1), iq_data))

    if today is None:
        today = max(map(itemgetter(0), iq_data))

    return sum(alpha**((today - date).days) * iq for date, iq in iq_data)

IQData = namedtuple("IQData", "date iq")

if __name__ == "__main__":
    from datetime import date

    days = [date(2008,1,1), date(2008,1,2), date(2008,1,7)]
    IQ = [110, 105, 90]
    iqdata = list(map(IQData, days, IQ))
    print("\n".join(map(str, iqdata)))

    print(smooth(iqdata, alpha=0.5))

示例:

$ python26 smooth.py
IQData(date=datetime.date(2008, 1, 1), iq=110)
IQData(date=datetime.date(2008, 1, 2), iq=105)
IQData(date=datetime.date(2008, 1, 7), iq=90)
95.0

嗨,J.F. Sebastian,我想使用这个EWMA公式在我的网站上显示趋势。我已经在SO上发布了一个问题——https://dev59.com/3Wox5IYBdhLWcg3wTytx。有人建议我使用EWMA算法,因为我需要更加强调最近的项目而不是旧的项目。由于我没有统计学方面的经验,我对如何计算“α”的值感到有些困惑。能帮帮忙吗?谢谢。 - Mridang Agarwalla
这些链接页面已经无法访问了,你能否更新一下它们? - sebix
@sebix:随意编辑。如果谷歌无法帮助,请尝试使用网络档案馆 - jfs
平滑因子是什么? - Kshitij Agrawal
@KshitijAgrawal:点击指数平滑链接以获得答案。 - jfs
sum(alpha**((today - date).days) * iq for date, iq in iq_data) 给出了指数加权和。它需要除以分母 sum(alpha**((today - date).days) for date, iq in iq_data) 才能得到平均值。 - matohak

16

我总是使用Pandas计算EMA:

这里有一个示例如何执行:

import pandas as pd
import numpy as np

def ema(values, period):
    values = np.array(values)
    return pd.ewma(values, span=period)[-1]

values = [9, 5, 10, 16, 5]
period = 5

print ema(values, period)

关于Pandas的EWMA更多信息:

http://pandas.pydata.org/pandas-docs/stable/generated/pandas.ewma.html


4
Pandas的新版本是否有更新和更好的函数? - Cristian Ciupitu
1
s.ewm(span = 2/alpha-1).mean() 其中 s 是一个序列。 - user3226167
@user3226167 你怎么让 alpha = y? - luky
@luky alpha 意味着平滑系数。您是指如何从numpy数组创建s吗?s = pd.Series(y) - user3226167
@user3226167 不,我原以为“alpha”是变量X,但后来发现方程式已经整合到函数中了,只需要更改静态的alpha参数即可。 - luky

15

我通过谷歌搜索找到了以下示例代码 (http://osdir.com/ml/python.matplotlib.general/2005-04/msg00044.html):

def ema(s, n):
    """
    returns an n period exponential moving average for
    the time series s

    s is a list ordered from oldest (index 0) to most
    recent (index -1)
    n is an integer

    returns a numeric array of the exponential
    moving average
    """
    s = array(s)
    ema = []
    j = 1

    #get n sma first and calculate the next n period ema
    sma = sum(s[:n]) / n
    multiplier = 2 / float(1 + n)
    ema.append(sma)

    #EMA(current) = ( (Price(current) - EMA(prev) ) x Multiplier) + EMA(prev)
    ema.append(( (s[n] - sma) * multiplier) + sma)

    #now calculate the rest of the values
    for i in s[n+1:]:
        tmp = ( (i - ema[j]) * multiplier) + ema[j]
        j = j + 1
        ema.append(tmp)

    return ema

2
为什么函数使用与函数名称相同的局部变量?除了使代码稍微难以阅读外,它还可能在后续的代码中引入难以检测的逻辑错误... - Homunculus Reticulli
2
s = array(s)的意义是什么?在我注释掉它之前,我一直有语法错误。 - swdev
@chjortlund 我不确定你所说的“列表中的每个第二项都将是SMA”的意思。当前EMA值基于先前的值,但您必须从某个地方开始,因此SMA被视为集合的初始值。这是计算EMA的正确方法。 - Zuku
@Zuku 确实,我已经删除了我的评论。当我发表评论时,我正在寻找一种处理实时传入数据的算法,而上面的代码片段并不适用于该用例(也没有宣传)- 这是我的错误! - chjortlund

7

您也可以使用SciPy滤波器方法,因为EMA是IIR滤波器。相对于使用enumerate()方法,在大数据集上使用timeit进行测试,这将带来约64倍的速度优势。

import numpy as np
from scipy.signal import lfilter

x = np.random.normal(size=1234)
alpha = .1 # smoothing coefficient
zi = [x[0]] # seed the filter state with first value
# filter can process blocks of continuous data if <zi> is maintained
y, zi = lfilter([1.-alpha], [1., -alpha], x, zi=zi)

6

我不熟悉Python,但是对于平均值部分,您是否指的是形式为指数衰减低通滤波器的方法?

y_new = y_old + (input - y_old)*alpha

其中alpha = dt/tau,dt为滤波器的时间步长,tau为滤波器的时间常数。如果使用可变时间步长形式,则将dt/tau剪切至不超过1.0。

y_new = y_old + (input - y_old)*dt/tau

如果你想过滤类似日期的内容,请确保将其转换为浮点数,例如从1970年1月1日开始的秒数。


5

我的Python有点生疏(如果我出现了语法问题,任何人都可以随意编辑此代码进行更正),但是这里有......

def movingAverageExponential(values, alpha, epsilon = 0):

   if not 0 < alpha < 1:
      raise ValueError("out of range, alpha='%s'" % alpha)

   if not 0 <= epsilon < alpha:
      raise ValueError("out of range, epsilon='%s'" % epsilon)

   result = [None] * len(values)

   for i in range(len(result)):
       currentWeight = 1.0

       numerator     = 0
       denominator   = 0
       for value in values[i::-1]:
           numerator     += value * currentWeight
           denominator   += currentWeight

           currentWeight *= alpha
           if currentWeight < epsilon: 
              break

       result[i] = numerator / denominator

   return result

此函数向后移动,从列表末尾到开头,通过向后计算指数移动平均值来为每个值计算加权系数,直到元素的加权系数小于给定的epsilon。

在函数结束时,它会将值反转后返回列表(以便按照调用者的要求正确排序)。

(附注:如果我使用的是Python之外的语言,我会先创建一个完整大小的空数组,然后倒序填充它,这样就不必在最后将其反转。但我认为在Python中无法声明一个大的空数组。而在Python列表中,附加操作比前置操作更少消耗,这就是我以相反的顺序构建列表的原因。如果我错了,请纠正我。)

'alpha'参数是每次迭代的衰减因子。例如,如果您使用0.5的alpha,则今天的移动平均值将由以下加权值组成:

today:        1.0
yesterday:    0.5
2 days ago:   0.25
3 days ago:   0.125
...etc...

当你有一个大量数值的数组时,十五天前或更早的值对今天的加权平均值的贡献将不会太大。"epsilon"参数可以让你设置一个截止点,低于这个点,你将不再关心旧值(因为它们对今天的值的贡献微不足道)。
你可以像这样调用函数:
result = movingAverageExponential(values, 0.75, 0.0001)

当非连续数据以不均匀时间间隔可用时,如何将其应用,例如在问题中:今天,5天前,6天前? - jfs
语法大部分正确,除了:'||' -> 'or','&&' -> 'and','list.length' -> 'len(list)',ifwhile附近的括号是不必要的。在Python中,您可以创建列表的副本:result = values[:]或创建一个大的“空”列表:result = [None]*len(values) - jfs
当 (alpha==1 或 epsilon==0) 时,您的算法是二次的。如果 len(values) 很大,M=log(epsilon)/log(alpha) 可能是一个很大的因子(内部循环执行的次数),所以我不会担心 values.reverse() -- 它只是对数据进行了一次额外的遍历。 - jfs
有一些算法可以在一次遍历中计算AWME(参见@earino答案的ema()和我的mov_average_expw())。 - jfs
关于非连续数据在非均匀时间间隔下的情况,我认为这个函数不应该处理这些情况。一个单独的包装类可以在这些数据之上提供插值服务。 - benjismith
显示剩余2条评论

5
在matplotlib.org的例子中(http://matplotlib.org/examples/pylab_examples/finance_work2.html),提供了一个使用numpy计算指数移动平均(EMA)函数的好例子。
def moving_average(x, n, type):
    x = np.asarray(x)
    if type=='simple':
        weights = np.ones(n)
    else:
        weights = np.exp(np.linspace(-1., 0., n))

    weights /= weights.sum()

    a =  np.convolve(x, weights, mode='full')[:len(x)]
    a[:n] = a[n]
    return a

3
import pandas_ta as ta

data["EMA3"] = ta.ema(data["close"], length=3)

pandas_ta 是一个技术分析库:https://github.com/twopirllc/pandas-ta。上述代码计算了一个序列的指数移动平均线(EMA)。你可以使用 'length' 指定滞后值,具体而言,上述代码计算了“3日 EMA”。


它是做什么的?你的代码输出是什么?你的描述在哪里?为什么它比其他解决方案更好? - Axisnix

3

可能是最短的:

#Specify decay in terms of span
#data_series should be a DataFrame

ema=data_series.ewm(span=5, adjust=False).mean()


3

我发现@earino提供的上述代码片段非常有用——但我需要能够持续平滑流式数值的内容,因此我对其进行了重构:

def exponential_moving_average(period=1000):
    """ Exponential moving average. Smooths the values in v over ther period. Send in values - at first it'll return a simple average, but as soon as it's gahtered 'period' values, it'll start to use the Exponential Moving Averge to smooth the values.
    period: int - how many values to smooth over (default=100). """
    multiplier = 2 / float(1 + period)
    cum_temp = yield None  # We are being primed

    # Start by just returning the simple average until we have enough data.
    for i in xrange(1, period + 1):
        cum_temp += yield cum_temp / float(i)

    # Grab the timple avergae
    ema = cum_temp / period

    # and start calculating the exponentially smoothed average
    while True:
        ema = (((yield ema) - ema) * multiplier) + ema

我是这样使用它的:

def temp_monitor(pin):
    """ Read from the temperature monitor - and smooth the value out. The sensor is noisy, so we use exponential smoothing. """
    ema = exponential_moving_average()
    next(ema)  # Prime the generator

    while True:
        yield ema.send(val_to_temp(pin.read()))

(其中pin.read()产生我想要消费的下一个值)。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接