为什么numpy比for循环慢

3

更新:此功能现已移至sciPy.stats.qmc.discrepancy,并进行了Cython移植,并实现了并行化。


我有一个使用for循环的函数,我想使用numpy来提高速度。但是似乎这样做没有效果,因为numpy版本似乎慢了两倍。以下是代码:

import numpy as np
import itertools
import timeit

def func():
    sample = np.random.random_sample((100, 2))
    
    disc1 = 0
    disc2 = 0
    n_sample = len(sample)
    dim = sample.shape[1]

    for i in range(n_sample):
        prod = 1
        for k in range(dim):
            sub = np.abs(sample[i, k] - 0.5)
            prod *= 1 + 0.5 * sub - 0.5 * sub ** 2
    
        disc1 += prod

    for i, j in itertools.product(range(n_sample), range(n_sample)):
        prod = 1
        for k in range(dim):
            a = 0.5 * np.abs(sample[i, k] - 0.5)
            b = 0.5 * np.abs(sample[j, k] - 0.5)
            c = 0.5 * np.abs(sample[i, k] - sample[j, k])
            prod *= 1 + a + b - c
        disc2 += prod

    c2 = (13 / 12) ** dim - 2 / n_sample * disc1 + 1 / (n_sample ** 2) * disc2


def func_numpy():
    sample = np.random.random_sample((100, 2))

    disc1 = 0
    disc2 = 0
    n_sample = len(sample)
    dim = sample.shape[1]

    disc1 = np.sum(np.prod(1 + 0.5 * np.abs(sample - 0.5) - 0.5 * np.abs(sample - 0.5) ** 2, axis=1))
    
    for i, j in itertools.product(range(n_sample), range(n_sample)):
        disc2 += np.prod(1 + 0.5 * np.abs(sample[i] - 0.5) + 0.5 * np.abs(sample[j] - 0.5) - 0.5 * np.abs(sample[i] - sample[j]))
    
    c2 = (13 / 12) ** dim - 2 / n_sample * disc1 + 1 / (n_sample ** 2) * disc2


print('Normal function time: ' , timeit.repeat('func()', number=20, repeat=5, setup="from __main__ import func"))
print('numpy function time: ', timeit.repeat('func_numpy()', number=20, repeat=5, setup="from __main__ import func_numpy"))

计时输出为:
Normal function time:  [2.831496894999873, 2.832342429959681, 2.8009242500411347, 2.8075121529982425, 2.824807019031141]
numpy function time:  [5.154757721000351, 5.2011515340418555, 5.148996959964279, 5.095560318033677, 5.125199959962629]

我在这里缺少什么?我知道瓶颈在itertools部分,因为我之前有一个100x100x2的循环而不是一个100x2的循环。 你看到其他方法可以做到吗?


你检查过是否有不必要的广播操作了吗? - cs95
我该如何准确地检查? - tupui
1
尝试对您的代码进行分析,找出瓶颈所在。例如,在您的numpy版本中,您可以计算np.abs(sample - 0.5)一次并重复使用它,而不是计算两次... - sirfz
2
每个版本都有很多可以改进的地方。在将它们相互比较之前,优化每个版本的解决方案是非常必要的,因为这样比较才有意义。 - MSeifert
那个itertools.product似乎不是最numpy友好的方法。你在numpy内外来回切换,需要付出额外的开销,谁知道还有什么其他问题。 - Ignacio Vergara Kausel
显示剩余4条评论
2个回答

3
使用NumPy,我们必须将事物向量化,我们在这里肯定可以这样做。
仔细观察循环部分,我们正在沿着输入数据samples的第一个轴进行两次迭代。
for i, j in itertools.product(range(n_sample), range(n_sample)):

我们可以将这些迭代转换为矢量化操作,一旦让广播处理它们。
现在,要想有完全矢量化的解决方案,我们需要更多的内存空间,具体来说是(N,N,M),其中(N,M)是输入数据的形状。
这里还有一个值得注意的方面,即在每次迭代中,我们并没有做太多的工作,因为我们正在对每行进行操作,而每行仅包含给定样本的2个元素。因此,出现的想法是,我们可以沿着M运行循环,以便在每次迭代中计算prod和累加。因此,对于给定的样本,只需要两个循环迭代。
退出循环后,我们将拥有累积的prod,只需要将其求和为disc2,即可得到最终输出。
以下是实现上述想法的代码:
prod_arr = 1
for i in range(sample.shape[1]):
    si = sample[:,i]
    prod_arr *= 1 + 0.5 * np.abs(si[:,None] - 0.5) + 0.5 * np.abs(si - 0.5) - \
                                    0.5 * np.abs(si[:,None] - si)
disc2 = prod_arr.sum()

运行时测试

原方法中循环部分的简化版本以及修改后的方法如下:

def org_app(sample):
    disc2 = 0
    n_sample = len(sample)
    for i, j in itertools.product(range(n_sample), range(n_sample)):
        disc2 += np.prod(1 + 0.5 * np.abs(sample[i] - 0.5) + 0.5 * \
            np.abs(sample[j] - 0.5) - 0.5 * np.abs(sample[i] - sample[j]))
    return disc2


def mod_app(sample):
    prod_arr = 1
    for i in range(sample.shape[1]):
        si = sample[:,i]
        prod_arr *= 1 + 0.5 * np.abs(si[:,None] - 0.5) + 0.5 * np.abs(si - 0.5) - \
                                        0.5 * np.abs(si[:,None] - si)
    disc2 = prod_arr.sum()
    return disc2

定时和验证 -

In [10]: sample = np.random.random_sample((100, 2))

In [11]: org_app(sample)
Out[11]: 11934.878683659041

In [12]: mod_app(sample)
Out[12]: 11934.878683659068

In [14]: %timeit org_app(sample)
10 loops, best of 3: 84.4 ms per loop

In [15]: %timeit mod_app(sample)
10000 loops, best of 3: 94.6 µs per loop

关于900x倍的加速!这应该足够激励人们尽可能地进行向量化处理。


通常在使用多行表达式时,最好包括 () 而不是使用反斜杠。 - MSeifert
@MSeifert 在哪里进行换行? - Divakar
它正在从我的注释中删除反斜杠。但是我指的是以prod_arr *= 1 + 0.5 * ...开头的那一行。 - MSeifert
@MSeifert 不允许我在我的Spyder IDE上避免反斜杠。这是一个IDE的问题吗? - Divakar
@Divakar 奇怪,我在答案中包含的版本不起作用吗(我使用了(),没有反斜杠)。我的IDE不喜欢行末有反斜杠。:D - MSeifert
如果有人正在阅读此内容:我已经将该功能推送到了 scipy.stats.qmc.discrepancy 中。再次感谢您的帮助! - tupui

2
正如我在评论中提到的那样,您的解决方案并不是最优的,比较不理想的方法也没有什么意义。
首先,迭代或索引NumPy数组的单个元素非常慢。最近我回答了一个问题,包括很多细节(如果您感兴趣,可以看一下:"convert np array to a set takes too long")。因此,Python方法只需将array转换为list即可更快:
def func():
    sample = np.random.random_sample((100, 2))
    disc1 = 0
    n_sample = len(sample)
    dim = sample.shape[1]
    sample = sample.tolist()  # converted to list

    for i in range(n_sample):
        prod = 1
        for item in sample[i]:
            sub = abs(item - 0.5)
            prod *= 1 + 0.5 * sub - 0.5 * sub ** 2
        disc1 += prod

    disc2 = 0
    for i, j in itertools.product(range(n_sample), range(n_sample)):
        prod = 1
        for k in range(dim):
            a = 0.5 * abs(sample[i][k] - 0.5)
            b = 0.5 * abs(sample[j][k] - 0.5)
            c = 0.5 * abs(sample[i][k] - sample[j][k])
            prod *= 1 + a + b - c
        disc2 += prod

    c2 = (13 / 12) ** dim - 2 / n_sample * disc1 + 1 / (n_sample ** 2) * disc2

我还用正常的abs替换了np.abs调用。正常的abs开销更低!而且还改变了一些其他部分。最终,这比您原来的“正常”方法快了10-20倍以上。
我还没有时间检查NumPy方法,@Divarkar已经包含了一个非常好的和优化过的方法。比较这两种方法:
def func_numpy():
    sample = np.random.random_sample((100, 2))

    disc1 = 0
    disc2 = 0
    n_sample = len(sample)
    dim = sample.shape[1]

    disc1 = np.sum(np.prod(1 + 
                           0.5 * np.abs(sample - 0.5) - 
                           0.5 * np.abs(sample - 0.5) ** 2, 
                           axis=1))

    prod_arr = 1
    for i in range(sample.shape[1]):
        s0 = sample[:,i]
        prod_arr *= (1 + 
                     0.5 * np.abs(s0[:,None] - 0.5) + 
                     0.5 * np.abs(s0 - 0.5) - 
                     0.5 * np.abs(s0[:,None] - s0))
    disc2 = prod_arr.sum()

    c2 = (13 / 12) ** dim - 2 / n_sample * disc1 + 1 / (n_sample ** 2) * disc2


print('Normal function time: ' , 
      timeit.repeat('func()', number=20, repeat=3, setup="from __main__ import func"))
# Normal function time:  [1.4846746248249474, 1.5018398493266432, 1.5476674017127152]
print('numpy function time: ', 
      timeit.repeat('func_numpy()', number=20, repeat=3, setup="from __main__ import func_numpy"))
# numpy function time:  [0.020140038561976326, 0.016502230831292763, 0.016452520269695015]

因此,优化后的NumPy方法绝对可以击败“优化后”的Python方法。它快了近100倍。如果您想要更快的速度,您可以在稍微修改过的纯Python代码上使用
import numba as nb

@nb.njit
def func_numba():
    sample = np.random.random_sample((100, 2))
    disc1 = 0
    n_sample = len(sample)
    dim = sample.shape[1]

    for i in range(n_sample):
        prod = 1
        for item in sample[i]:
            sub = abs(item - 0.5)
            prod *= 1 + 0.5 * sub - 0.5 * sub ** 2
        disc1 += prod

    disc2 = 0
    for i in range(n_sample):
        for j in range(n_sample):
            prod = 1
            for k in range(dim):
                a = 0.5 * abs(sample[i,k] - 0.5)
                b = 0.5 * abs(sample[j,k] - 0.5)
                c = 0.5 * abs(sample[i,k] - sample[j,k])
                prod *= 1 + a + b - c
            disc2 += prod

    return (13 / 12) ** dim - 2 / n_sample * disc1 + 1 / (n_sample ** 2) * disc2

func_numba()


print('numba function time: ' , 
      timeit.repeat('func_numba()', number=20, repeat=3, setup="from __main__ import func_numba"))
# numba function time:  [0.003022848984983284, 0.0030429566279508435, 0.004060626777572907]

这几乎比NumPy方法快8-10倍。

@Y0da 我还提供了一种额外的方法,并与Divarkar的解决方案进行了时间比较。 - MSeifert
我不能为两个答案投票,但你的答案绝对值得一看!感谢您提供Numba方法。 - tupui
1
Numba是最后的手段啊! ;) - Divakar
@MSeifert,关于广播,为什么Numba更快?我认为在NumPy后面的是C,所以对我来说不应该有加速。 - tupui
然而,numba并不支持所有(实际上只支持少数)Python和NumPy操作,并且需要大量的试错才能使操作尽可能快。在这种情况下,这很容易,但通常很难找到一个比NumPy实现实际上更快(多得多)的numba实现。 - MSeifert
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接