使用numpy或numba优化Python中的for循环性能(可能性)。

3

我希望优化这个函数中 for 循环的性能。

import numpy as np
import random

def play_game(row, n=1000000):
    """Play the game! This game is a kind of random walk.

    Arguments:
        row (int[]): row index to use in the p matrix for each step in the
                     walk. Then length of this array is the same as n.

        n (int): number of steps in the random walk
    """
    p = np.array([[ 0.499,  0.499,  0.499],
                  [ 0.099,  0.749,  0.749]])
    X0 = 100
    Y0 = X0 % 3
    X = np.zeros(n)
    tempX = X0
    Y = Y0

    for j in range(n):
        tempX = X[j] = tempX + 2 * (random.random() < p.item(row.item(j), Y)) - 1
        Y = tempX % 3

    return np.r_[X0, X]

困难在于每个步骤都基于X的值计算Y的值,并且下一步要使用Y来更新X的值。

我想知道是否有一些Numpy技巧可以起到很大的作用。使用Numba是可以的(我尝试过,但没有太大的成功)。然而,我不想使用Cython。


如果你正在使用Python2,使用xrange()而不是range()可能会有一点帮助。 - davejagoda
我正在使用Python 3。 - Loïc Séguin-C.
1个回答

1
一个快速的观察告诉我们,在函数代码中迭代之间存在数据依赖性。现在,有不同类型的数据依赖性。你所看到的数据依赖性是索引依赖性,即任何迭代中的数据选择都取决于前一次迭代的计算。这种依赖关系似乎很难在迭代之间进行跟踪,因此本文并不是一个矢量化解决方案。相反,我们将尽可能预先计算在循环内使用的值。基本思想是在循环内尽量少做工作。
以下是如何进行预计算并实现更高效解决方案的简要说明:
  • 鉴于从输入的row中提取行元素时,p的形状相对较小,您可以使用p[row]预先选择所有这些行。

  • 在每次迭代中,您都会计算一个随机数。您可以用一个随机数组替换它,在循环之前设置好,因此,您也将预先计算这些随机值。

  • 基于到目前为止预先计算的值,您将拥有p中所有行的列索引。请注意,这些列索引将是一个大的ndarray,包含所有可能的列索引,而在我们的代码中,每次迭代只会选择一个基于计算。使用每次迭代的列索引,您将增加或减少X0以获得每次迭代的输出。

实现看起来像这样 -

randarr = np.random.rand(n)
p = np.array([[ 0.499,  0.419,  0.639],
              [ 0.099,  0.749,  0.319]])

def play_game_partvect(row,n,randarr,p):

    X0 = 100
    Y0 = X0 % 3

    signvals = 2*(randarr[:,None] < p[row]) - 1
    col_idx = (signvals + np.arange(3)) % 3

    Y = Y0
    currval = X0
    out = np.empty(n+1)
    out[0] = X0
    for j in range(n):
        currval = currval + signvals[j,Y]
        out[j+1] = currval
        Y = col_idx[j,Y]

    return out

为了验证与原始代码的一致性,您需要修改原始代码如下 -

def play_game(row,n,randarr,p):
    X0 = 100
    Y0 = X0 % 3
    X = np.zeros(n)
    tempX = X0
    Y = Y0
    for j in range(n):
        tempX = X[j] = tempX + 2 * (randarr[j] < p.item(row.item(j), Y)) - 1
        Y = tempX % 3
    return np.r_[X0, X]

请注意,由于此代码预计算了那些随机值,因此这已经比问题中的代码快了很多。
运行时测试和输出验证 -
In [2]: # Inputs
   ...: n = 1000
   ...: row = np.random.randint(0,2,(n))
   ...: randarr = np.random.rand(n)
   ...: p = np.array([[ 0.499,  0.419,  0.639],
   ...:               [ 0.099,  0.749,  0.319]])
   ...: 

In [3]: np.allclose(play_game_partvect(row,n,randarr,p),play_game(row,n,randarr,p))
Out[3]: True

In [4]: %timeit play_game(row,n,randarr,p)
100 loops, best of 3: 11.6 ms per loop

In [5]: %timeit play_game_partvect(row,n,randarr,p)
1000 loops, best of 3: 1.51 ms per loop

In [6]: # Inputs
   ...: n = 10000
   ...: row = np.random.randint(0,2,(n))
   ...: randarr = np.random.rand(n)
   ...: p = np.array([[ 0.499,  0.419,  0.639],
   ...:               [ 0.099,  0.749,  0.319]])
   ...: 

In [7]: np.allclose(play_game_partvect(row,n,randarr,p),play_game(row,n,randarr,p))
Out[7]: True

In [8]: %timeit play_game(row,n,randarr,p)
10 loops, best of 3: 116 ms per loop

In [9]: %timeit play_game_partvect(row,n,randarr,p)
100 loops, best of 3: 14.8 ms per loop

因此,我们看到了大约7.5倍以上的加速,不错!

在循环中不使用 col_idx,而是计算 Y = currval %3,可以使速度更快。此外,在 for 循环中,使用 .item() 比使用 [] 下标更快,因为返回的对象是 Python 标量而不是 numpy 标量,使用 Python 标量进行算术运算更快。 - Loïc Séguin-C.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接