如何加速numpy代码

6

我有以下代码。原则上,它需要执行2^6 * 1000 = 64000次迭代,这是一个相当小的数量。但在我的电脑上需要9秒钟,而我想至少运行n = 15。

from __future__ import division
import numpy as np
import itertools

n=6
iters = 1000
firstzero = 0
bothzero = 0
for S in itertools.product([-1,1], repeat = n+1):
    for i in xrange(iters):
        F = np.random.choice(np.array([-1,0,0,1], dtype=np.int8), size = n)
        while np.all(F ==0):
            F = np.random.choice(np.array([-1,0,0,1], dtype=np.int8), size = n)
        FS = np.convolve(F,S, 'valid')
        if (FS[0] == 0):
            firstzero += 1
        if np.all(FS==0):
            bothzero += 1

print "firstzero",    firstzero
print "bothzero",  bothzero

这个是否可能大幅度加速,还是需要用C重写?

分析表明它的大部分时间都花在了

   258003    0.418    0.000    3.058    0.000 fromnumeric.py:1842(all)
   130003    1.245    0.000    2.907    0.000 {method 'choice' of 'mtrand.RandomState' objects}
   388006    2.488    0.000    2.488    0.000 {method 'reduce' of 'numpy.ufunc' objects}
   128000    0.731    0.000    2.215    0.000 numeric.py:873(convolve)
   258003    0.255    0.000    2.015    0.000 {method 'all' of 'numpy.ndarray' objects}
   258003    0.301    0.000    1.760    0.000 _methods.py:35(_all)
   130003    0.470    0.000    1.663    0.000 fromnumeric.py:2249(prod)
   644044    1.483    0.000    1.483    0.000 {numpy.core.multiarray.array}
   130003    0.164    0.000    1.193    0.000 _methods.py:27(_prod)
   258003    0.283    0.000    0.624    0.000 numeric.py:462(asanyarray)

1
你能解释一下这段代码在做什么吗? - YXD
@MrE 它正在计算两个随机数组的卷积次数,其中一个数组比另一个数组长一个单位,并且具有特定概率分布,在第一个位置处有一个0或在两个位置都有0的次数。 - Simd
2
只是提醒一下。通过改进代码,你只能获得线性加速。除非你跳过O(2^n)的复杂度,否则你不会走得太远,除非你获得像99%这样的加速。n=15将始终比n=62^15 / 2^6 = 2^9倍,因此为了保持相同的时间,实际上需要512倍的加速。 - luk32
@luk32 是的。我怀疑如果用C编写且由比我更优秀的程序员编写,它目前可以快512倍。每次迭代它并没有做很多事情。 - Simd
@user2179021 我真的有所怀疑 =)。在我看来,通过改变语言/工具,你最多只能得到10个(质数),请注意,你已经在使用专门的工具,程序并不会花费时间在你的代码上。正如John建议的那样,也许使用不同的工具是前进的道路。但我认为您可能需要并行化程序以获得更大的因子,并实现更大的问题规模......并且您仍然会很快撞上O(2^n)的墙。但祝你好运。也许n=15到20将在合理的时间内可以计算。 - luk32
2个回答

12

你的代码几乎完全向量化后速度会更快(16.9%),假设你的函数名为 f():

def g():
        n=6
        iters = 1000
        S=np.repeat(list(itertools.product([-1,1], repeat = n+1)),iters, axis=0).reshape((-1,n+1))
        F=np.random.choice(np.array([-1,0,0,1], dtype=np.int8), size = (iters*(2**(n+2)),n)) #oversampling
        F=F[~(F==0).all(1)][:iters*(2**(n+1))]
        FS=np.asanyarray(map(lambda x, y: np.convolve(x, y, 'valid'), F, S))
        firstzero=(FS[:,0]==0).sum()
        bothzero=(FS==0).all(1).sum()
        print "firstzero",    firstzero
        print "bothzero",  bothzero

计时结果:

In [164]:

%timeit f()
firstzero 27171
bothzero 12151
firstzero 27206
bothzero 12024
firstzero 27272
bothzero 12135
firstzero 27173
bothzero 12079
1 loops, best of 3: 14.6 s per loop
In [165]:

%timeit g()
firstzero 27182
bothzero 11952
firstzero 27365
bothzero 12174
firstzero 27318
bothzero 12173
firstzero 27377
bothzero 12072
1 loops, best of 3: 2.47 s per loop

太好了。谢谢! - Simd
1
不客气。我认为首先尝试向量化,然后再考虑 cfortran 或其他语言。许多在 numpyscipy 中的东西都已经在底层实现了,因此,在至少一半的时间内只需要进行向量化就足够了。 - CT Zhu

5

我通过一次性生成所有随机选择的方法轻松获得了35-40%的加速:

for S in itertools.product([-1,1], repeat = n+1):
    Fx = np.random.choice(np.array([-1,0,0,1], dtype=np.int8), size=(iters,n))                                       
        for F in Fx:

这将替换掉循环 for i in xrange(iters)

为了超越这一点,我怀疑你可以使用scipy.signal.fftconvolve来向量化卷积本身(np.convolve仅支持1D输入)。 我没有尝试过这个方法,部分原因是在我撰写此文时,scipy.org已经离线,但我希望这足以让您前进。 主要的想法是尽可能减少您在Python中执行的循环,并用向量化操作替换它们。


谢谢。我对scipy.signal.fftconvolve不太理解。你是在建议进行二维卷积吗? - Simd
йӮЈж ·еҒҡеҮ д№ҺжҳҜдёҚиЎҢзҡ„гҖӮдҪ зңӢпјҢеҝ…йЎ»жҺ’йҷӨnp.all(F ==0)зҡ„иЎҢгҖӮ Fxеҝ…йЎ»з”ҹжҲҗжҜ”(iters, n)жӣҙеӨ§зҡ„е°әеҜёгҖӮ - CT Zhu
@user2179021 通过FFT进行卷积在处理更大的数组尺寸时比常规卷积更具扩展性。请参考此页面了解详情:http://www.dspguide.com/ch18.htm - ArtemB

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接