Python/Cython/Numpy优化np.nonzero函数

5
我有一段代码,我想要优化它。大部分代码执行时间都花在了cdef np.ndarray index = np.argwhere(array==1)上。 其中,array是一个512x512,512的numpy数组,里面只包含0和1。你有什么加速的想法吗?使用Python 2.7和Numpy 1.8.1。
球度函数
def sphericity(self,array):

    #Pass an mask array (1's are marked, 0's ignored)
    cdef np.ndarray index = np.argwhere(array==1)
    cdef int xSize,ySize,zSize
    xSize,ySize,zSize=array.shape

    cdef int sa,vol,voxelIndex,x,y,z,neighbors,xDiff,yDiff,zDiff,x1,y1,z1
    cdef float onethird,twothirds,sp
    sa=vol=0 #keep running tally of volume and surface area
    #cdef int nonZeroCount = (array != 0).sum() #Replaces np.count_nonzero(array) for speed
    for voxelIndex in range(np.count_nonzero(array)):
    #for voxelIndex in range(nonZeroCount):
        x=index[voxelIndex,0]
        y=index[voxelIndex,1]
        z=index[voxelIndex,2]
        #print x,y,z,array[x,y,z]
        neighbors=0
        vol+=1

        for xDiff in [-1,0,1]:
            for yDiff in [-1,0,1]:
                for zDiff in [-1,0,1]:
                    if abs(xDiff)+abs(yDiff)+abs(zDiff)==1:
                        x1=x+xDiff
                        y1=y+yDiff
                        z1=z+zDiff
                        if x1>=0 and y1>=0 and z1>=0 and x1<xSize and y1<ySize and z1<zSize:
                            #print '-',x1,y1,z1,array[x1,y1,z1]
                            if array[x1,y1,z1]:
                                #print '-',x1,y1,z1,array[x1,y1,z1]
                                neighbors+=1

        #print 'had this many neighbors',neighbors
        sa+=(6-neighbors)

    onethird=float(1)/float(3)
    twothirds=float(2)/float(3)
    sph = ((np.pi**onethird)*((6*vol)**twothirds)) / sa
    #print 'sphericity',sphericity
    return sph

性能测试

#Imports
import pstats, cProfile
import numpy as np
import pyximport
pyximport.install(setup_args={"script_args":["--compiler=mingw32"], "include_dirs":np.get_include()}, reload_support=True) #Generate cython version

#Create fake array to calc sphericity
fakeArray=np.zeros((512,512,512))
fakeArray[200:300,200:300,200:300]=1

#Profiling stuff
cProfile.runctx("sphericity(fakeArray)", globals(), locals(), "Profile.prof")
s = pstats.Stats("Profile.prof")
s.strip_dirs().sort_stats("time").print_stats()

性能分析输出

Mon Oct 06 11:49:57 2014    Profile.prof

         12 function calls in 4.373 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    3.045    3.045    4.373    4.373 <string>:1(<module>)
        1    1.025    1.025    1.025    1.025 {method 'nonzero' of 'numpy.ndarray' objects}
        2    0.302    0.151    0.302    0.151 {numpy.core.multiarray.array}
        1    0.001    0.001    1.328    1.328 numeric.py:731(argwhere)
        1    0.000    0.000    0.302    0.302 fromnumeric.py:492(transpose)
        1    0.000    0.000    0.302    0.302 fromnumeric.py:38(_wrapit)
        1    0.000    0.000    0.000    0.000 {method 'transpose' of 'numpy.ndarray' objects}
        1    0.000    0.000    0.302    0.302 numeric.py:392(asarray)
        1    0.000    0.000    0.000    0.000 numeric.py:462(asanyarray)
        1    0.000    0.000    0.000    0.000 {getattr}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}

请说明您的开发环境(操作系统,Python版本,Numpy版本,Cython版本) - Flavian Hautbois
你能提供一下初始化“数组”变量的代码片段吗?最后一个问题:你怎么知道这是瓶颈?你尝试过什么? - Flavian Hautbois
2个回答

7

Jaime可能已经给出了一个好的答案,但我会评论如何改进Cython代码,并添加性能比较。

首先,你应该使用“annotate”功能,cython -a filename.pyx,这将生成一个HTML文件。在浏览器中加载它,它会用黄橙色高亮显示“慢”的行,这表明可以进行改进的地方。

Annotate立即揭示了两件容易修复的事情:

将习惯用语转换为Cython理解的内容

首先,这些行很慢:

        for xDiff in [-1,0,1]:
            for yDiff in [-1,0,1]:
                for zDiff in [-1,0,1]:

这是因为Cython不知道如何将列表迭代转换为干净的c代码。需要将其转换为等效的代码,以便Cython可以进行优化,即“in range”形式:
        for xDiff in range(-1, 2):
            for yDiff in range(-1, 2):
                for zDiff in range(-1, 2):

使用类型数组进行快速索引

接下来,需要注意的是以下代码行速度较慢:

                            if array[x1,y1,z1]:

这是因为array没有被赋予类型。因此,它使用的是Python级别的索引而不是C级别的索引。要解决这个问题,您需要给数组分配一个类型,可以按照以下方式完成:

def sphericity(np.ndarray[np.uint8_t, ndim=3] array):

假设数组类型是'uint8',请替换为适当的类型(注意:Cython不支持“np.bool”类型,因此我使用“uint8”)。
您还可以使用内存视图,在内存视图上无法使用numpy函数,但您可以在数组上创建一个视图,然后索引视图而不是数组。
    cdef np.uint8_t array_view [:, :, :] = array
    ...
                                    if array_view[x1,y1,z1]:

内存视图可能会稍微快一些,并且清晰地区分了数组(Python级别调用)和视图(C级别调用)。如果您不使用任何Numpy函数,则可以毫无问题地使用内存视图。

重写代码以避免对数组进行多次传递

剩下的问题是计算indexnonZeroCount都很慢,这主要是由于数据的大小(基本上,迭代512*512*512个元素需要时间!)。 通常情况下,任何Numpy可以做到的事情,优化后的Cython都可以更快地完成(通常快2-10倍) - Numpy只是为您节省了大量重复造轮子和打字的时间,并让您在更高的层次上思考(如果您不是C程序员,您可能无法足够优化Cython)。但在这种情况下很容易解决,您可以消除indexnonZeroCount以及所有相关代码,只需执行以下操作:

    for x in range(0, xSize):
        for y in range(0, ySize):
            for z in range(0, zSize):
                if array[x,y,z] == 0:
                    continue
                ... 

这是因为c语言非常快速(它可以完美地编译成Cython),每秒处理数十亿操作都不成问题。通过消除“index”和“nonZeroCount”步骤,您基本上为整个数组节省了两个迭代周期,即使在最大速度下,每个迭代周期也需要至少0.1秒。更重要的是CPU缓存,整个数组大小为128mb,比cpu缓存大得多,因此一次性完成所有操作可以更好地利用cpu缓存(如果数组完全适合于cpu缓存,则多次操作并不重要)。
优化版本:
以下是我优化版本的完整代码:
#cython: boundscheck=False, nonecheck=False, wraparound=False
import numpy as np
cimport numpy as np

def sphericity2(np.uint8_t [:, :, :] array):

    #Pass an mask array (1's are marked, 0's ignored)
    cdef int xSize,ySize,zSize
    xSize=array.shape[0]
    ySize=array.shape[1]
    zSize=array.shape[2]

    cdef int sa,vol,x,y,z,neighbors,xDiff,yDiff,zDiff,x1,y1,z1
    cdef float onethird,twothirds,sp
    sa=vol=0 #keep running tally of volume and surface area

    for x in range(0, xSize):
        for y in range(0, ySize):
            for z in range(0, zSize):
                if array[x,y,z] == 0:
                    continue

                neighbors=0
                vol+=1

                for xDiff in range(-1, 2):
                    for yDiff in range(-1, 2):
                        for zDiff in range(-1, 2):
                            if abs(xDiff)+abs(yDiff)+abs(zDiff)==1:
                                x1=x+xDiff
                                y1=y+yDiff
                                z1=z+zDiff
                                if x1>=0 and y1>=0 and z1>=0 and x1<xSize and y1<ySize and z1<zSize:
                                    #print '-',x1,y1,z1,array[x1,y1,z1]
                                    if array[x1,y1,z1]:
                                        #print '-',x1,y1,z1,array[x1,y1,z1]
                                        neighbors+=1

                #print 'had this many neighbors',neighbors
                sa+=(6-neighbors)

    onethird=float(1)/float(3)
    twothirds=float(2)/float(3)
    sph = ((np.pi**onethird)*((6*vol)**twothirds)) / sa
    #print 'sphericity',sphericity
    return sph

球形度执行时间比较:

原始数据         : 2.123秒
Jaime的          : 1.819秒
优化的Cython     : 0.136秒
@ moarningsun    : 0.090秒

所有的Cython解决方案都运行得更快,未展开内部循环时(见注释),它的速度提高了15倍以上,展开后的内部循环则提高了23倍以上。


你可以通过执行诸如 if x > 0: neighbors += array[x-1,y,z] 这样的操作来轻松地摆脱最内层的三个循环,以处理所有六个方向。这样做更快(我试过了),但当然也会失去一些通用性。无论如何,非常好的回答! - user2379410
@moarningsun 我已经添加了消除内部循环和检查的时间。 - Blake Walsh
好的,太好了!按照我的时间表,我发现速度提升对于较小的数组更为明显。对于较大的数组,我猜测内存带宽成为了瓶颈。 - user2379410
问题:使用def sphericity2(np.uint8_t [:, :, :] array)时,我遇到了这个错误,ValueError: Does not understand character buffer dtype format string ('?')。我做错了什么? - Dbricks
@Dbricks 我认为这是因为numpy数组没有被赋予适当的类型,在我的测试代码中,我使用了fakeArray=np.zeros((512,512,512), np.uint8) - Blake Walsh

6
你可以在不需要Cython的情况下,通过使用Vanilla NumPy来实现代码的大部分功能。关键是要以高效的方式计数邻居,这可以通过对从输入数组获取的掩模的切片进行“and”运算来完成。将所有内容组合在一起,我认为以下代码与你的代码实现相同,但重复度更低:
def sphericity(arr):
    mask = arr != 0
    vol = np.count_nonzero(mask)
    counts = np.zeros_like(arr, dtype=np.intp)
    for dim, size in enumerate(arr.shape):
        slc = (slice(None),) * dim
        axis_mask = (mask[slc + (slice(None, -1),)] &
                     mask[slc + (slice(1, None),)])
        counts[slc + (slice(None, -1),)] += axis_mask
        counts[slc + (slice(1, None),)] += axis_mask
    sa = np.sum(6 - counts[counts != 0])

    return np.pi**(1./3.)*(6*vol)**(2./3.) / sa

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接