NumPy中的Arnaud Legoux移动平均(ALMA)

5

我想用NumPy(或Pandas)编写计算Arnaud Legoux移动平均(ALMA)的向量化代码。你能帮我完成这个吗?谢谢。

非向量化版本如下所示(见下文)。

def NPALMA(pnp_array, **kwargs) :
    '''
    ALMA - Arnaud Legoux Moving Average,
    http://www.financial-hacker.com/trend-delusion-or-reality/
    https://github.com/darwinsys/Trading_Strategies/blob/master/ML/Features.py
    '''
    length = kwargs['length']
    # just some number (6.0 is useful)
    sigma = kwargs['sigma']
    # sensisitivity (close to 1) or smoothness (close to 0)
    offset = kwargs['offset']

    asize = length - 1
    m = offset * asize
    s = length  / sigma
    dss = 2 * s * s
    
    alma = np.zeros(pnp_array.shape)
    wtd_sum = np.zeros(pnp_array.shape)

    for l in range(len(pnp_array)):
        if l >= asize:
            for i in range(length):
                im = i - m
                wtd = np.exp( -(im * im) / dss)
                alma[l] += pnp_array[l - length + i] * wtd
                wtd_sum[l] += wtd
            alma[l] = alma[l] / wtd_sum[l]

    return alma
2个回答

4

开始方法

我们可以沿着第一个轴创建滑动窗口,然后使用wtd值的范围进行张量乘法来进行求和约简。

实现大致如下 -

# Get all wtd values in an array
wtds = np.exp(-(np.arange(length) - m)**2/dss)

# Get the sliding windows for input array along first axis
pnp_array3D = strided_axis0(pnp_array,len(wtds))

# Initialize o/p array
out = np.zeros(pnp_array.shape)

# Get sum-reductions for the windows which don't need wrapping over
out[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]

# Last element of the output needed wrapping. So, do it separately.
out[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])

# Finally perform the divisions
out /= wtds.sum()

获取滑动窗口的函数:strided_axis0是来自这里

使用1D卷积提高性能

那些与wtds值相乘,然后进行求和约简的操作基本上是沿着第一个轴进行的卷积。因此,我们可以在axis=0上使用scipy.ndimage.convolve1d。由于内存效率很高,这样会快得多,因为我们不会创建巨大的滑动窗口。

实现方法如下:

from scipy.ndimage import convolve1d as conv

avgs = conv(pnp_array, weights=wtds/wtds.sum(),axis=0, mode='wrap')

因此,out[length-1:],即非零行与 avgs[:-length+1] 相同。

如果我们使用非常小的核数从 wtds 进行操作,则可能存在一些精度差异。因此,在使用此 convolution 方法时,请注意这一点。

运行时间测试

方法 -

def original_app(pnp_array, length, m, dss):
    alma = np.zeros(pnp_array.shape)
    wtd_sum = np.zeros(pnp_array.shape)

    for l in range(len(pnp_array)):
        if l >= asize:
            for i in range(length):
                im = i - m
                wtd = np.exp( -(im * im) / dss)
                alma[l] += pnp_array[l - length + i] * wtd
                wtd_sum[l] += wtd
            alma[l] = alma[l] / wtd_sum[l]
    return alma

def vectorized_app1(pnp_array, length, m, dss):
    wtds = np.exp(-(np.arange(length) - m)**2/dss)
    pnp_array3D = strided_axis0(pnp_array,len(wtds))
    out = np.zeros(pnp_array.shape)
    out[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]
    out[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])
    out /= wtds.sum()
    return out

def vectorized_app2(pnp_array, length, m, dss):
    wtds = np.exp(-(np.arange(length) - m)**2/dss)
    return conv(pnp_array, weights=wtds/wtds.sum(),axis=0, mode='wrap')

计时 -

In [470]: np.random.seed(0)
     ...: m,n = 1000,100
     ...: pnp_array = np.random.rand(m,n)
     ...: 
     ...: length = 6
     ...: sigma = 0.3
     ...: offset = 0.5
     ...: 
     ...: asize = length - 1
     ...: m = np.floor(offset * asize)
     ...: s = length  / sigma
     ...: dss = 2 * s * s
     ...: 

In [471]: %timeit original_app(pnp_array, length, m, dss)
     ...: %timeit vectorized_app1(pnp_array, length, m, dss)
     ...: %timeit vectorized_app2(pnp_array, length, m, dss)
     ...: 
10 loops, best of 3: 36.1 ms per loop
1000 loops, best of 3: 1.84 ms per loop
1000 loops, best of 3: 684 µs per loop

In [472]: np.random.seed(0)
     ...: m,n = 10000,1000 # rest same as previous one

In [473]: %timeit original_app(pnp_array, length, m, dss)
     ...: %timeit vectorized_app1(pnp_array, length, m, dss)
     ...: %timeit vectorized_app2(pnp_array, length, m, dss)
     ...: 
1 loop, best of 3: 503 ms per loop
1 loop, best of 3: 222 ms per loop
10 loops, best of 3: 106 ms per loop

我已经检查了您使用一维卷积的第二种方法。看起来有些问题,但我无法确定具体是什么问题。 我的示例: pnp_array=np.array([ 3924.00752506 , 5774.30335369 , 5519.40734814 , 4931.71344059]) offset=0.85 sigma=6 length=3 m=1.7 dss=0.5预期结果应该是[0, 0, 5594.17030842, 5115.59420056]。但第二次应用返回[0, 0, 5693.3358598, 5333.61073335]。因此,累计误差为-317.182084168。 这是因为您提到的内核数量太小吗? - Prokhozhii
@Prokhozhii 我知道那些是正确的。我想知道它们的值,以便了解它们与 pnp_array 中的值相比如何。如果这些值要小得多,那么请使用 app#1。好的,它们看起来非常小,所以在这种情况下,请使用 app#1。这是我之前提到的同样的精度问题。 - Divakar
据我所知,应用程序#1存在索引错误。它返回[0, 0, 5199.97996566, 5594.17030842]。但第四个元素应该是第三个元素,而缺失的第五个元素应该是第四个元素。你能帮忙解决一下吗?谢谢。 - Prokhozhii
1
也许我错了(我是NumPy的新手),但是以下代码在我的情况下可以正常工作:def vectorized_app1(pnp_array, length, m, dss): wtds = np.exp( -(np.arange(length) - m)**2 / dss ) pnp_array3D = strided_axis0(pnp_array, len(wtds)) out = np.zeros(pnp_array.shape) out[length-1:] = np.tensordot( pnp_array3D , wtds , axes=( (1), (0) ) ) [:] out /= wtds.sum() return out - Prokhozhii
@Prokhozhii,“vectorized_app2”的输出与TradingView的输出并不完全匹配,即使只有小数点后两位。这种不匹配足以改变决策!相比之下,像EMA这样简单的东西确实非常匹配。 - Asclepius
显示剩余2条评论

1

在Divakar之前的回答中,没有一个与TradingView的ALMA v26.0产生的数字完全匹配。重要的是数字能够与参考值相匹配。考虑到这个目标,Pine Script v5实现的ta.alma可以作为一个参考,尽管它不是向量化的。

下面是一个使用NumPy的版本,它产生与TradingView相同的输出:

import numpy as np

def alma_np(prices: np.ndarray, window: int = 9, sigma: float = 6, offset: float = 0.85) -> np.ndarray:
    # Ref: https://dev59.com/cKbja4cB1Zd3GeqPeVLK#76031340/
    m = offset * (window - 1)
    s = window / sigma
    i = np.arange(window)
    weights = np.exp(-1 * np.square(i - m) / (2 * np.square(s)))
    norm_weights = weights / np.sum(weights)
    padded_prices = np.pad(prices, (window - 1, 0), mode='edge')
    alma_values = np.convolve(padded_prices, norm_weights[::-1], mode='valid')
    return alma_values

这里是一个使用上述NumPy版本的Pandas版本。
def alma_pd(prices: pd.Series, window: int = 9, *, sigma: float = 6, offset: float = 0.85) -> pd.Series:
    # Ref: https://dev59.com/cKbja4cB1Zd3GeqPeVLK#76031340/
    prices_series = prices
    prices = prices.to_numpy()
    alma_values = alma_np(prices, window, sigma=sigma, offset=offset)
    alma_series = pd.Series(alma_values, index=prices_series.index)
    return alma_series

通过2023-04-14的$SPY的5分钟数据进行交叉核对,匹配到小数点后第五位。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接