在Cython中创建小型数组需要大量时间。

8
我正在为numpy编写一个新的随机数生成器,可以根据任意分布产生随机数。但是我发现了一个非常奇怪的行为:
这是test.pyx文件。
#cython: boundscheck=False
#cython: wraparound=False
import numpy as np
cimport numpy as np
cimport cython

def BareBones(np.ndarray[double, ndim=1] a,np.ndarray[double, ndim=1] u,r):
    return u

def UntypedWithLoop(a,u,r):
    cdef int i,j=0
    for i in range(u.shape[0]):
        j+=i
    return u,j

def BSReplacement(np.ndarray[double, ndim=1] a, np.ndarray[double, ndim=1] u):
    cdef np.ndarray[np.int_t, ndim=1] r=np.empty(u.shape[0],dtype=int)
    cdef int i,j=0
    for i in range(u.shape[0]):
        j=i
    return r

setup.py

from distutils.core import setup
from Cython.Build import cythonize
setup(name = "simple cython func",ext_modules = cythonize('test.pyx'),)

代码性能分析
#!/usr/bin/python
from __future__ import division

import subprocess
import timeit

#Compile the cython modules before importing them
subprocess.call(['python', 'setup.py', 'build_ext', '--inplace'])

sstr="""
import test
import numpy
u=numpy.random.random(10)
a=numpy.random.random(10)
a=numpy.cumsum(a)
a/=a[-1]
r=numpy.empty(10,int)
"""

print "binary search: creates an array[N] and performs N binary searches to fill it:\n",timeit.timeit('numpy.searchsorted(a,u)',sstr)
print "Simple replacement for binary search:takes the same args as np.searchsorted and similarly returns a new array. this performs only one trivial operation per element:\n",timeit.timeit('test.BSReplacement(a,u)',sstr)

print "barebones function doing nothing:",timeit.timeit('test.BareBones(a,u,r)',sstr)
print "Untyped inputs and doing N iterations:",timeit.timeit('test.UntypedWithLoop(a,u,r)',sstr)
print "time for just np.empty()",timeit.timeit('numpy.empty(10,int)',sstr)

二分查找实现需要执行len(u)*Log(len(a))的时间。简单的Cython函数需要执行len(u)的时间才能运行。两者都返回长度为len(u)的1D整数数组。
然而,即使这个没有计算的简单实现比numpy库中完整的二分搜索还要慢。(它是用C编写的:https://github.com/numpy/numpy/blob/202e78d607515e0390cffb1898e11807f117b36a/numpy/core/src/multiarray/item_selection.c请参见PyArray_SearchSorted)
结果如下:
binary search: creates an array[N] and performs N binary searches to fill it:
1.15157485008
Simple replacement for binary search:takes the same args as np.searchsorted and similarly returns a new array. this performs only one trivial operation per element:
3.69442796707
barebones function doing nothing: 0.87496304512
Untyped inputs and doing N iterations: 0.244267940521
time for just np.empty() 1.0983929634

为什么np.empty()步骤要花费如此多的时间?我该怎么做才能得到一个我可以返回的空数组?
C函数在内部循环中使用更长的算法并运行一堆健全性检查。 (我从示例中删除了除循环本身以外的所有逻辑)
更新:
事实证明有两个不同的问题:
1.仅调用np.empty(10)就具有巨大的开销,并且需要与searchsorted一起执行10次二分搜索,这需要花费相同的时间
2.仅声明缓冲区语法np.ndarray[...]也具有巨大的开销,它所需的时间比接收未经类型定义的变量并迭代50次还要长。
50次迭代结果:
binary search: 2.45336699486
Simple replacement:3.71126317978
barebones function doing nothing: 0.924916028976
Untyped inputs and doing N iterations: 0.316384077072
time for just np.empty() 1.04949498177

当你将importcimportnumpy命名为相同的名称时,会感到困惑,在scikits image中,他们通常会执行import numpy as np; cimport numpy as cnp来区分它们。但我认为你在调用np.empty时的np是被import的那个,而且没有被cimport,所以这是一个Python函数调用,具有其众所周知的开销。你可以通过Cython调用PyArray_SimpleNew来避免它,但不确定如何操作。如果你担心这种级别的优化,就放弃Cython,全程使用C-API吧... - Jaime
1
@Jaime 导入并使用别名 np 的 numpy 是标准用法,尽管有些令人困惑。我也见过你提到的那种方式。http://docs.cython.org/src/tutorial/numpy.html#adding-types 我认为 cython 文档建议的方式可能会在可用时将 cython 变量重新绑定到相同的名称,就像在 python 中发生标准名称空间冲突时一样。 - JoshAdel
@JoshAdel 我的观点是,我不确定调用np.empty是否会进行Python函数调用,这可能会解释开销,或者是Cython变体,这将表明Cython中的某些内容并不好。但我所编写的唯一Cython代码是来自文档的“Hello World!”:我发现它很令人困惑,主要是因为很难弄清楚某些东西是在快速的C中运行还是在缓慢的Python中运行,并且一直转向Python / NumPy C-API。因此,我的观点是有偏见的,而且不是很了解... - Jaime
1
@Jaime 我总是发现使用 cython -a 很有帮助,它可以得到一个注释版本的代码,对调用 Python API 的行进行逐行着色,并允许您选择一行并查看相应生成的 C 代码。 - JoshAdel
2个回答

2

1
在Cython函数内创建np.empty会增加一些开销,正如你已经看到的那样。下面是一个示例,展示如何创建空数组并将其传递给Cython模块以填充正确的值: n=10:
numpy.searchsorted: 1.30574745517
cython O(1): 3.28732016088
cython no array declaration 1.54710909596

n=100:

numpy.searchsorted: 4.15200545373
cython O(1): 13.7273431067
cython no array declaration 11.4186086744

正如你已经指出的那样,由于其为O(len(u)*long(len(a))),因此numpy版本的规模扩展得更好,而此处的算法则为O(len(u)*len(a))...。
我还尝试过使用Memoryview,基本上是通过将np.ndarray[double, ndim=1]替换为double[:]来实现的,但在这种情况下第一种选项更快。
新的.pyx文件如下:
from __future__ import division
import numpy as np
cimport numpy as np
cimport cython

@cython.boundscheck(False)
@cython.wraparound(False)
def JustLoop(np.ndarray[double, ndim=1] a, np.ndarray[double, ndim=1] u,
             np.ndarray[int, ndim=1] r):
    cdef int i,j
    for j in range(u.shape[0]):
        if u[j] < a[0]:
            r[j] = 0
            continue

        if u[j] > a[a.shape[0]-1]:
            r[j] = a.shape[0]-1
            continue

        for i in range(1, a.shape[0]):
            if u[j] >= a[i-1] and u[j] < a[i]:
                r[j] = i
                break

@cython.boundscheck(False)
@cython.wraparound(False)
def WithArray(np.ndarray[double, ndim=1] a, np.ndarray[double, ndim=1] u):
    cdef np.ndarray[np.int_t, ndim=1] r=np.empty(u.shape[0],dtype=int)
    cdef int i,j
    for j in range(u.shape[0]):
        if u[j] < a[0]:
            r[j] = 0
            continue

        if u[j] > a[a.shape[0]-1]:
            r[j] = a.shape[0]-1
            continue

        for i in range(1, a.shape[0]):
            if u[j] >= a[i-1] and u[j] < a[i]:
                r[j] = i
                break
    return r

新的 .py 文件:
import numpy
import subprocess
import timeit

#Compile the cython modules before importing them
subprocess.call(['python', 'setup.py', 'build_ext', '--inplace'])
from test import *

sstr="""
import test
import numpy
u=numpy.random.random(10)
a=numpy.random.random(10)
a=numpy.cumsum(a)
a/=a[-1]
a.sort()
r = numpy.empty(u.shape[0], dtype=int)
"""

print "numpy.searchsorted:",timeit.timeit('numpy.searchsorted(a,u)',sstr)
print "cython O(1):",timeit.timeit('test.WithArray(a,u)',sstr)
print "cython no array declaration",timeit.timeit('test.JustLoop(a,u,r)',sstr)

1
  1. 累加和步骤生成递增序列,因此不需要排序。(2) 缩放是因为您使用了线性搜索算法,而numpy函数很可能使用O(logn)二分搜索。(3) 我省略了代码的实际工作部分,以便只研究开销。
- staticd
请查看此处:http://docs.scipy.org/doc/numpy/reference/generated/numpy.searchsorted.html,正如您所见,`a` 必须按升序排序,或者至少您必须使用 sorter 参数传递其 argsort - Saullo G. P. Castro
1
a=numpy.cumsum(a) 会生成一个升序序列,因为它是一个累积和。(out[i]=in[i]+out[i-1]) 这一步从一些随机概率生成了一个累积分布。我们可以使用 searchsorted 来获取相应于原始 "a" 的概率的索引,从而得到一个反向的结果。 - staticd
谢谢,我没有足够注意到那个事实,所以在这里 sort() 调用是不必要的。 - Saullo G. P. Castro

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接