在Python中加速Metropolis-Hastings算法

6

我有一些编程代码,用MCMC(Metropolis Hastings)对后验分布进行抽样。我使用scipy生成随机样本:

import numpy as np
from scipy import stats

def get_samples(n):
    """
    Generate and return a randomly sampled posterior.

    For simplicity, Prior is fixed as Beta(a=2,b=5), Likelihood is fixed as Normal(0,2)

    :type n: int
    :param n: number of iterations

    :rtype: numpy.ndarray
    """
    x_t = stats.uniform(0,1).rvs() # initial value
    posterior = np.zeros((n,))
    for t in range(n):
        x_prime = stats.norm(loc=x_t).rvs() # candidate
        p1 = stats.beta(a=2,b=5).pdf(x_prime)*stats.norm(loc=0,scale=2).pdf(x_prime) # prior * likelihood 
        p2 = stats.beta(a=2,b=5).pdf(x_t)*stats.norm(loc=0,scale=2).pdf(x_t) # prior * likelihood 
        alpha = p1/p2 # ratio
        u = stats.uniform(0,1).rvs() # random uniform
        if u <= alpha:
            x_t = x_prime # accept
            posterior[t] = x_t
        elif u > alpha:
            x_t = x_t # reject
    posterior = posterior[np.where(posterior > 0)] # get rid of initial zeros that don't contribute to distribution
    return posterior

通常情况下,我尽量避免在Python中使用显式for循环 - 我会尝试使用纯numpy生成所有内容。然而,在这种算法的情况下,无法避免使用if语句的for循环。因此,代码速度相当慢。当我分析我的代码时,它大部分时间都花在for循环内(显然),更具体地说,最慢的部分是生成随机数; stats.beta().pdf()stats.norm().pdf()
有时我会使用numba来加速矩阵运算。虽然numba与某些numpy操作兼容,但生成随机数不属于其中之一。Numba有一个cuda rng,但这仅限于正态和均匀分布。
我的问题是,是否有一种方式可以使用各种分布的随机抽样来显着加速上述代码,并且与numba兼容?
我们不必局限于numba,但它是我知道的唯一易于使用的优化器。更普遍地说,我正在寻找加速Python中各种分布(beta、gamma、泊松)的随机抽样在for循环中的方法。

1
Numba支持相当多的随机分布。https://numba.pydata.org/numba-doc/dev/reference/numpysupported.html#numpy-random 我想你需要自己实现pdf方法(https://github.com/scipy/scipy/blob/v1.2.1/scipy/stats/_continuous_distns.py)。 - max9111
2
stats.beta().pdf()stats.norm().pdf()不会生成随机数。它们正在评估概率分布的密度。 - jwalton
1
顺便提一下,如果您查看scipy源代码,您会发现对scipy.special函数(xlog1py,betaln)的调用。这是在Numba中轻松使用它们的方法。https://github.com/numba/numba/issues/3086 - max9111
2个回答

16
在开始考虑numba等优化之前,您可以对此代码进行许多优化(仅通过算法实现的巧妙处理,我已经将此代码的速度提高了25倍)。
首先,在Metropolis-Hastings算法的实现中存在错误。您需要保留方案的每个迭代,而不管链是否移动。也就是说,您需要从代码中删除 posterior = posterior[np.where(posterior > 0)] ,并在每个循环的结尾处添加 posterior[t] = x_t
其次,这个示例似乎有点奇怪。通常,在这些推断问题中,我们希望根据一些观察结果来推断分布的参数。然而,在这里,分布的参数是已知的,而您要采样的是观察结果?无论如何,无论如何,我很乐意按照您的示例展示如何加快速度。
加速
为了开始,请从主for循环中删除与t值无关的任何内容。首先从for循环中删除随机行走创新的生成:
    x_t = stats.uniform(0,1).rvs()
    innov = stats.norm(loc=0).rvs(size=n)
    for t in range(n):
        x_prime = x_t + innov[t]

当然,也可以将生成随机数u的过程从for循环中移出:
    x_t = stats.uniform(0,1).rvs()
    innov = stats.norm(loc=0).rvs(size=n)

    u = np.random.uniform(size=n)
    for t in range(n):
        x_prime = x_t + innov[t]
        ...
        if u[t] <= alpha:

另一个问题是在每次循环中都计算当前后验概率,这是不必要的。每次循环中需要计算建议后验概率,当建议被接受时,您可以更新等于。
    x_t = stats.uniform(0,1).rvs()
    innov = stats.norm(loc=0).rvs(size=n)

    u = np.random.uniform(size=n)

    p2 = stats.beta(a=2,b=5).pdf(x_t)*stats.norm(loc=0,scale=2).pdf(x_t)
    for t in range(n):
        x_prime = x_t + innov[t]

        p1 = stats.beta(a=2,b=5).pdf(x_prime)*stats.norm(loc=0,scale=2).pdf(x_prime)
        ...
        if u[t] <= alpha:
            x_t = x_prime # accept
            p2 = p1

        posterior[t] = x_t

一个非常小的改进就是直接将scipy统计函数导入到命名空间中:

from scipy.stats import norm, beta

另一个非常微小的改进是注意到你代码中的elif语句没有起到任何作用,因此可以删除。

将所有内容综合起来,并使用更合理的变量名称,我得出了以下结果:

from scipy.stats import norm, beta
import numpy as np

def my_get_samples(n, sigma=1):

    x_cur = np.random.uniform()
    innov = norm.rvs(size=n, scale=sigma)
    u = np.random.uniform(size=n)

    post_cur = beta.pdf(x_cur, a=2, b=5) * norm.pdf(x_cur, loc=0, scale=2)

    posterior = np.zeros(n)
    for t in range(n):
        x_prop = x_cur + innov[t]

        post_prop = beta.pdf(x_prop, a=2, b=5) * norm.pdf(x_prop, loc=0, scale=2)
        alpha = post_prop / post_cur
        if u[t] <= alpha:
            x_cur = x_prop
            post_cur = post_prop

        posterior[t] = x_cur

    return posterior

现在,进行速度比较:

%timeit get_samples(1000)
3.19 s ± 5.28 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit my_get_samples(1000)
127 ms ± 484 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

这是速度提升了25倍。

ESS

值得注意的是,对于MCMC算法来说,光速度并不是一切。实际上,你所关心的是每秒从后验中可以获得的独立(近似)抽样数量。通常使用ESS(有效样本量)来评估。通过调整随机游走,您可以提高算法的效率(从而增加每秒绘制的有效样本数)。

为此,通常会进行初始试运行,即samples = my_get_samples(1000)。从此输出中计算sigma = 2.38**2 * np.var(samples)。然后应将此值用于调整方案中的随机游走,如innov = norm.rvs(size=n, scale=sigma)。看似任意的2.38^2出现的原因是:

随机行走Metropolis算法的弱收敛和最优缩放(1997)。A. Gelman,W.R. Gilks和G.O. Roberts。

为了说明调整可以带来的改进,让我们运行两次我的算法,一次调整过,另一次未调整,都使用10000次迭代:

x = my_get_samples(10000)
y = my_get_samples(10000, sigma=0.12)

fig, ax = plt.subplots(1, 2)
ax[0].hist(x, density=True, bins=25, label='Untuned algorithm', color='C0')
ax[1].hist(y, density=True, bins=25, label='Tuned algorithm', color='C1')
ax[0].set_ylabel('density')
ax[0].set_xlabel('x'), ax[1].set_xlabel('x')
fig.legend()

你可以立即看到调整对我们算法效率的改进。请记住,两次运行都是为相同的迭代次数而进行的。 enter image description here

最后的想法

如果你的算法收敛需要很长时间,或者你的样本具有大量自相关性,我建议尝试使用Cython来进一步提高速度优化。
我还建议查看PyStan项目。需要一些时间来适应,但它的NUTS HMC算法可能会比你手写的任何Metropolis-Hastings算法表现更好。

当然,返回的样本并不完全相同。生成随机数的顺序是完全不同的。 - jwalton
检查结果是否相同的方法是验证两个算法是否收敛到相同的后验分布。例如:y = my_get_samples(10000) plt.hist(x); plt.hist(y) - jwalton
1
不客气。在你研究numba之前,你应该先调整你的方案的随机漫步。我现在正在我的答案中添加更多信息。 - jwalton
1
@jwalton3141,虽然这不会对性能产生贡献,但在我们返回“后验概率”之前,可能需要再加入一行代码。我想要对样本进行稀疏处理以减少自相关性,并且只从1000个样本开始(burn in period)取样,以去除初始的随机样本:posterior = posterior[1000::4] # 从1000个样本开始(burn in),每隔4个样本取一个样本(稀疏处理) - PyRsquared
1
posterior[burn_in::M]可以实现您的要求。个人而言,除非存储空间有问题,我会从get_samples中返回完整的posterior样本,并使用轨迹图、自相关图等工具检查输出结果,然后再决定添加多少“Burn-in”期和抽稀率。请注意,这与PyRsquared建议从get_samples中返回posterior[burn_in::M]的方法不同--在这种情况下,我们将在未看到完整输出的情况下决定burn-in期和抽稀率。 - jwalton
显示剩余9条评论

0

不幸的是,我真的看不到任何加速随机分布的可能性,除非你自己在numba兼容的Python代码中重写它们。

但是,加速您的代码瓶颈的一个简单方法是将两个调用统计函数替换为一个调用:

p1, p2 = (
    stats.beta(a=2, b=5).pdf([x_prime, x_t])
    * stats.norm(loc=0, scale=2).pdf([x_prime, x_t]))

另一个微小的调整可能是使用以下代码将u的生成外包到for循环之外:

x_t = stats.uniform(0, 1).rvs() # initial value
posterior = np.zeros((n,))
u = stats.uniform(0, 1).rvs(size=n) # random uniform
for t in range(n):  # and so on

然后在循环中对u进行索引(当然,必须删除循环中的u = stats.uniform(0,1).rvs() # random uniform这一行):

if u[t] <= alpha:
    x_t = x_prime # accept
    posterior[t] = x_t
elif u[t] > alpha:
    x_t = x_t # reject

小的改动也可以简化if条件,例如省略elif语句或者如果需要用于其他目的,则将其替换为else。但这只是微小的改进:

if u[t] <= alpha:
    x_t = x_prime # accept
    posterior[t] = x_t

编辑

基于jwalton的答案,另一个改进:

def new_get_samples(n):
    """
    Generate and return a randomly sampled posterior.

    For simplicity, Prior is fixed as Beta(a=2,b=5), Likelihood is fixed as Normal(0,2)

    :type n: int
    :param n: number of iterations

    :rtype: numpy.ndarray
    """

    x_cur = np.random.uniform()
    innov = norm.rvs(size=n)
    x_prop = x_cur + innov
    u = np.random.uniform(size=n)

    post_cur = beta.pdf(x_cur, a=2,b=5) * norm.pdf(x_cur, loc=0,scale=2)
    post_prop = beta.pdf(x_prop, a=2,b=5) * norm.pdf(x_prop, loc=0,scale=2)

    posterior = np.zeros((n,))
    for t in range(n):        
        alpha = post_prop[t] / post_cur
        if u[t] <= alpha:
            x_cur = x_prop[t]
            post_cur = post_prop[t]
        posterior[t] = x_cur
    return posterior

随着时间的改进:

%timeit my_get_samples(1000)
# 187 ms ± 13 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit my_get_samples2(1000)
# 1.55 ms ± 57.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

这是比jwalton的答案快121倍的加速。它通过外包post_prop计算来实现。

检查直方图,这似乎没问题。但最好问问jwalton是否真的没问题,他似乎对这个主题有更深入的理解。:)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接