将numpy折扣计算向量化

4
在金融和强化学习中,一个常见的术语是基于原始奖励时间序列R[i]的折扣累积奖励C[i]。给定数组R,我们想要计算满足递归关系C[i] = R[i] + discount * C[i+1](其中C[-1] = R[-1])的C[i],并返回完整的C数组。
在python中,使用numpy数组计算这个值的一种数值稳定的方法可能是:
import numpy as np
def cumulative_discount(rewards, discount):
    future_cumulative_reward = 0
    assert np.issubdtype(rewards.dtype, np.floating), rewards.dtype
    cumulative_rewards = np.empty_like(rewards)
    for i in range(len(rewards) - 1, -1, -1):
        cumulative_rewards[i] = rewards[i] + discount * future_cumulative_reward
        future_cumulative_reward = cumulative_rewards[i]
    return cumulative_rewards

但是,这依赖于Python循环。考虑到这是如此常见的计算,肯定有一些现有的利用其他标准函数的矢量化解决方案,而无需采用cython化。

请注意,任何使用类似np.power(discount,np.arange(len(rewards))的解决方案都不稳定。


@CrazyIvan 同意,我有点马虎,会修复的。 - VF1
@CrazyIvan 哦,那是另一个错误,你回答得很正确,但我没有问我想问的问题。对此感到抱歉。(我已经修改了问题;for循环仍然是应该矢量化的相同循环)。 - VF1
4个回答

13

你可以使用scipy.signal.lfilter来解决递归关系:

def alt(rewards, discount):
    """
    C[i] = R[i] + discount * C[i+1]
    signal.lfilter(b, a, x, axis=-1, zi=None)
    a[0]*y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[M]*x[n-M]
                          - a[1]*y[n-1] - ... - a[N]*y[n-N]
    """
    r = rewards[::-1]
    a = [1, -discount]
    b = [1]
    y = signal.lfilter(b, a, x=r)
    return y[::-1]

这个脚本测试结果是否相同:

import scipy.signal as signal
import numpy as np

def orig(rewards, discount):
    future_cumulative_reward = 0
    cumulative_rewards = np.empty_like(rewards, dtype=np.float64)
    for i in range(len(rewards) - 1, -1, -1):
        cumulative_rewards[i] = rewards[i] + discount * future_cumulative_reward
        future_cumulative_reward = cumulative_rewards[i]
    return cumulative_rewards

def alt(rewards, discount):
    """
    C[i] = R[i] + discount * C[i+1]
    signal.lfilter(b, a, x, axis=-1, zi=None)
    a[0]*y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[M]*x[n-M]
                          - a[1]*y[n-1] - ... - a[N]*y[n-N]
    """
    r = rewards[::-1]
    a = [1, -discount]
    b = [1]
    y = signal.lfilter(b, a, x=r)
    return y[::-1]

# test that the result is the same
np.random.seed(2017)

for i in range(100):
    rewards = np.random.random(10000)
    discount = 1.01
    expected = orig(rewards, discount)
    result = alt(rewards, discount)
    if not np.allclose(expected,result):
        print('FAIL: {}({}, {})'.format('alt', rewards, discount))
        break

哇,这很棒。在这种特殊情况下,它是否归结为相同的数值计算? - VF1
是的,我相信如此。定义 lfilter 的 C 代码可以在这里找到 - unutbu
1
确实,这个函数 证实了这一点。(顺便说一句,在这些情况下,折扣通常小于1) - VF1
FYI,快速执行%timeit检查orig(np.ones(1000), 0.95)alt(np.ones(1000), 0.95)告诉我每个循环的时间分别为295 µs ± 6.29 µs(orig)和11.5 µs ± 28.7 ns(alt),因此该方法在性能方面表现良好。 - bluenote10

1
您所描述的计算方法称为霍纳规则或者霍纳方法来求解多项式。它在NumPy polynomial.polyval中实现。
但是,您想要整个cumulative_rewards列表,即所有Horner规则的中间步骤。NumPy方法不返回这些中间值。装饰有Numba的@jit的函数可能是最优的选择。
作为一种理论上的可能性,我将指出,如果给定一个Hankel矩阵的系数,polyval可以返回整个列表。这是矢量化的,但最终比Python循环效率低,因为每个cumulative_reward值都是独立计算的。
from numpy.polynomial.polynomial import polyval
from scipy.linalg import hankel

rewards = np.random.uniform(10, 100, size=(100,))
discount = 0.9
print(polyval(discount, hankel(rewards)))

这与

的输出相匹配。
print(cumulative_discount(rewards, discount))

好的,就像你所说的,“polyval”在这里不够用,因为我们对整个数组感兴趣。我理解你提到Numba,那么是否是否定了我的原始问题的答案? - VF1

0
如果您想要一个仅使用numpy的解决方案,请尝试这个(借鉴了unutbu答案中的结构):
def alt2(rewards, discount):
    tmp = np.arange(rewards.size)
    tmp = tmp - tmp[:, np.newaxis]
    w = np.triu(discount ** tmp.clip(min=0)).T
    return (rewards.reshape(-1, 1) * w).sum(axis=0)

以下是证明。

import numpy as np

def orig(rewards, discount):
    future_cumulative_reward = 0
    cumulative_rewards = np.empty_like(rewards, dtype=np.float64)
    for i in range(len(rewards) - 1, -1, -1):
        cumulative_rewards[i] = rewards[i] + discount * future_cumulative_reward
        future_cumulative_reward = cumulative_rewards[i]
    return cumulative_rewards

def alt2(rewards, discount):
    tmp = np.arange(rewards.size)
    tmp = tmp - tmp[:, np.newaxis]
    w = np.triu(discount ** tmp.clip(min=0)).T
    return (rewards.reshape(-1, 1) * w).sum(axis=0)

# test that the result is the same
np.random.seed(2017)

for i in range(100):
    rewards = np.random.random(100)
    discount = 1.01
    expected = orig(rewards, discount)
    result = alt2(rewards, discount)
    if not np.allclose(expected,result):
        print('FAIL: {}({}, {})'.format('alt', rewards, discount))
        break
else:
    print('success')

然而,这种解决方案在处理大型奖励数组时不太可扩展,但您仍然可以使用步幅技巧进行解决,如此处所指出


0

我想通过引入累积奖励的初始条件来扩展unutbu的出色解决方案。我希望C[-2]与C[-1](稳态)大致相等,而不是从R[-1]开始。以下是实现这一目标的方法:

import scipy.signal as signal

def alt(rewards, discount):
    """
    C[i] = R[i] + discount * C[i+1]
    signal.lfilter(b, a, x, axis=-1, zi=None)
    a[0]*y[n] = b[0]*x[n] + b[1]*x[n-1] + ... + b[M]*x[n-M]
                          - a[1]*y[n-1] - ... - a[N]*y[n-N]
    """
    r = rewards[::-1]
    a = [1, -discount]
    b = [1]
    zi = signal.lfilter_zi(b, a) * r[0]  # steady state when input is constant and equal to r[0]
    y = signal.lfilter(b, a, x=r, zi=zi)
    return y[::-1]

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接