Cython Gibbs采样器比numpy的稍慢。

3
我已经实现了一个 Gibbs 抽样器来生成纹理图像。根据 beta 参数(形状为(4)的数组),我们可以生成各种纹理。
以下是我使用 Numpy 的初始函数:
def gibbs_sampler(img_label, betas, burnin, nb_samples):
    nb_iter = burnin + nb_samples

    lst_samples = []

    labels = np.unique(img)

    M, N = img.shape
    img_flat = img.flatten()

    # build neighborhood array by means of numpy broadcasting:
    m, n = np.ogrid[0:M, 0:N]

    top_left, top, top_right =   m[0:-2, :]*N + n[:, 0:-2], m[0:-2, :]*N + n[:, 1:-1]  , m[0:-2, :]*N + n[:, 2:]
    left, pix, right = m[1:-1, :]*N + n[:, 0:-2],  m[1:-1, :]*N + n[:, 1:-1], m[1:-1, :]*N + n[:, 2:]
    bottom_left, bottom, bottom_right = m[2:, :]*N + n[:, 0:-2],  m[2:, :]*N + n[:, 1:-1], m[2:, :]*N + n[:, 2:]

    mat_neigh = np.dstack([pix, top, bottom, left, right, top_left, bottom_right, bottom_left, top_right])

    mat_neigh = mat_neigh.reshape((-1, 9))    
    ind = np.arange((M-2)*(N-2))  

    # loop over iterations
    for iteration in np.arange(nb_iter):

        np.random.shuffle(ind)

        # loop over pixels
        for i in ind:                  

            truc = map(functools.partial(lambda label, img_flat, mat_neigh : 1-np.equal(label, img_flat[mat_neigh[i, 1:]]).astype(np.uint), img_flat=img_flat, mat_neigh=mat_neigh), labels)
            # bidule is of shape (4, 2, labels.size)
            bidule = np.array(truc).T.reshape((-1, 2, labels.size))

            # theta is of shape (labels.size, 4) 
            theta = np.sum(bidule, axis=1).T
            # prior is thus an array of shape (labels.size)
            prior = np.exp(-np.dot(theta, betas))

            # sample from the posterior
            drawn_label = np.random.choice(labels, p=prior/np.sum(prior))

            img_flat[(i//(N-2) + 1)*N + i%(N-2) + 1] = drawn_label


        if iteration >= burnin:
            print('Iteration %i --> sample' % iteration)
            lst_samples.append(copy.copy(img_flat.reshape(M, N)))

        else:
            print('Iteration %i --> burnin' % iteration)

    return lst_samples

由于循环是迭代算法的一部分,因此我们无法摆脱它。因此,我尝试使用Cython(带有静态类型)来加速它:

from __future__ import division
import numpy as np
import copy
cimport numpy as np
import functools
cimport cython

INTTYPE = np.int
DOUBLETYPE = np.double

ctypedef np.int_t INTTYPE_t
ctypedef  np.double_t DOUBLETYPE_t

@cython.boundscheck(False)
@cython.wraparound(False)
@cython.nonecheck(False)


def func_for_map(label, img_flat,  mat_neigh, i):

   return  (1-np.equal(label, img_flat[mat_neigh[i, 1:]])).astype(INTTYPE)


def gibbs_sampler(np.ndarray[INTTYPE_t, ndim=2] img_label, np.ndarray[DOUBLETYPE_t, ndim=1] betas, INTTYPE_t burnin=5, INTTYPE_t nb_samples=1):


    assert img_label.dtype == INTTYPE and betas.dtype== DOUBLETYPE

    cdef unsigned int nb_iter = burnin + nb_samples 

    lst_samples = list()

    cdef np.ndarray[INTTYPE_t, ndim=1] labels
    labels = np.unique(img_label)

    cdef unsigned int M, N
    M = img_label.shape[0]
    N = img_label.shape[1]

    cdef np.ndarray[INTTYPE_t, ndim=1] ind     
    ind = np.arange((M-2)*(N-2), dtype=INTTYPE)

    cdef np.ndarray[INTTYPE_t, ndim=1] img_flat
    img_flat = img_label.flatten()


    # build neighborhood array:
    cdef np.ndarray[INTTYPE_t, ndim=2] m
    cdef np.ndarray[INTTYPE_t, ndim=2] n


    m = (np.ogrid[0:M, 0:N][0]).astype(INTTYPE)
    n = (np.ogrid[0:M, 0:N][1]).astype(INTTYPE)



    cdef np.ndarray[INTTYPE_t, ndim=2] top_left, top, top_right, left, pix, right, bottom_left, bottom, bottom_right

    top_left, top, top_right =   m[0:-2, :]*N + n[:, 0:-2], m[0:-2, :]*N + n[:, 1:-1]  , m[0:-2, :]*N + n[:, 2:]
    left, pix, right = m[1:-1, :]*N + n[:, 0:-2],  m[1:-1, :]*N + n[:, 1:-1], m[1:-1, :]*N + n[:, 2:]
    bottom_left, bottom, bottom_right = m[2:, :]*N + n[:, 0:-2],  m[2:, :]*N + n[:, 1:-1], m[2:, :]*N + n[:, 2:]

    cdef np.ndarray[INTTYPE_t, ndim=3] mat_neigh_init
    mat_neigh_init = np.dstack([pix, top, bottom, left, right, top_left, bottom_right, bottom_left, top_right])

    cdef np.ndarray[INTTYPE_t, ndim=2] mat_neigh
    mat_neigh = mat_neigh_init.reshape((-1, 9))    

    cdef unsigned int i
    truc = list()
    cdef np.ndarray[INTTYPE_t, ndim=3] bidule
    cdef np.ndarray[INTTYPE_t, ndim=2] theta
    cdef np.ndarray[DOUBLETYPE_t, ndim=1] prior
    cdef unsigned int drawn_label, iteration       



    # loop over ICE iterations
    for iteration in np.arange(nb_iter):

        np.random.shuffle(ind) 

        # loop over pixels        
        for i in ind:            

            truc = map(functools.partial(func_for_map, img_flat=img_flat, mat_neigh=mat_neigh, i=i), labels)                        


            bidule = np.array(truc).T.reshape((-1, 2, labels.size)).astype(INTTYPE)            


            theta = np.sum(bidule, axis=1).T

            # ok so far

            prior = np.exp(-np.dot(theta, betas)).astype(DOUBLETYPE)
#            print('ok after prior') 
#            return 0
            # sample from the posterior
            drawn_label = np.random.choice(labels, p=prior/np.sum(prior))

            img_flat[(i//(N-2) + 1)*N + i%(N-2) + 1] = drawn_label


        if iteration >= burnin:
            print('Iteration %i --> sample' % iteration)
            lst_samples.append(copy.copy(img_flat.reshape(M, N)))

        else:
            print('Iteration %i --> burnin' % iteration)   



    return lst_samples

然而,最终我得到了几乎相同的计算时间,其中numpy版本比Cython略快。因此,我正在尝试改进Cython的代码。
编辑:
对于两个函数(Cython和非Cython): 我已经替换了:
truc = map(functools.partial(lambda label, img_flat, mat_neigh : 1-np.equal(label, img_flat[mat_neigh[i, 1:]]).astype(np.uint), img_flat=img_flat, mat_neigh=mat_neigh), labels)

通过广播:

truc = 1-np.equal(labels[:, None], img_flat[mat_neigh[i, 1:]][None, :])

现在使用np.einsum计算先验概率,range已全部替换为np.arange。这两个函数比以前都要快,但Python函数仍然比Cython函数略快。


“bidule”和“betas”的形状是什么? - Divakar
请查看我的编辑(第一段代码中的注释)。 - floflo29
这里适用于最近的另一个cython问题的评论,http://stackoverflow.com/questions/40233664/cython-actually-slowing-me-down。 - hpaulj
不,我在一个形状为(labels.size,)的数组上调用np.exp - floflo29
如果您在labels数组中使用二分查找来查找8个相邻像素,则可能将复杂度从O(n_iter * n_pixels * n_labels)降低到O(n_iter * n_pixels * log(n_labels))。也许吧。这需要一个棘手的自定义random.choice方法,并且仅在n_labels很大时才值得使用... - user6758673
显示剩余3条评论
3个回答

3
我已经在您的源代码上运行了Cython注释模式,并查看了结果。也就是说,我将其保存在q.pyx中,并运行了它。
cython -a q.pyx
firefox q.html

(当然,您可以使用任何浏览器)。

代码被染成深黄色,这表明在Cython看来,代码远未静态类型化。据我所知,它可以分为两类。

在某些情况下,您可以更好地静态类型化您的代码:

  1. for iteration in np.arange(nb_iter):for i in ind:中,每次迭代您需要支付约30个C行的代价。请参阅此处以了解如何在Cython中高效访问numpy数组。

  2. truc = map(functools.partial(func_for_map, img_flat=img_flat, mat_neigh=mat_neigh, i=i), labels)中,您并没有从静态类型化中获得任何好处。我建议您cdef函数func_for_map,并在循环中自己调用它。

在其他情况下,您正在调用numpy向量化函数,例如theta = np.sum(bidule, axis=1).Tprior = np.exp(-np.dot(theta, betas)).astype(DOUBLETYPE)等。在这些情况下,Cython真的没有太多好处。


@floflo29 注释模式 Cython 表示不会,就我的理解来说。我建议你尝试一下 - 它实际上会显示生成的 C 代码。正如我所写的,每个循环生成约30个 LOCS,其中一些是相当重的函数调用。 - Ami Tavory
唯一加速的方法是将numpy函数重写为Cython/C吗? - floflo29
@floflo29 numpy函数确实是用Cython/C编写的(或者至少是有效地这样)。 - Ami Tavory
是的,我知道,但为什么Cython告诉我我的代码(特别是在调用Numpy函数时)可以改进? - floflo29
@floflo29 别重写 numpy 函数 - 它们通常非常好。我认为这个答案的要点是,你应该在 cython 中使用 for idx in range(length): 来完成所有循环,并避免 Python 迭代机制(例如 map 或一般的 for i in iterable: 样式循环)。 - DavidW
显示剩余5条评论

2
如果您想加速NumPy代码,我们可以提高内部循环的性能,希望这样能够转化为一些总体改善。所以,我们有以下内容:
theta = np.sum(bidule, axis=1).T
prior = np.exp(-np.dot(theta, betas))

将求和与矩阵乘法合并为一步,我们会得到 -

np.dot(np.sum(bidule, axis=1).T, betas)

现在,这涉及到沿轴求和,然后进行逐元素乘法的总和约减。我们有许多工具可以使用,其中包括np.einsum,特别是因为我们可以一次性执行这些约减操作,如下所示 -

np.einsum('ijk,i->k',bidule,betas)

运行时测试 -

In [98]: # Setup
    ...: N = 100
    ...: bidule = np.random.rand(4,2,N)
    ...: betas = np.random.rand(4)
    ...: 

In [99]: %timeit np.dot(np.sum(bidule, axis=1).T, betas)
100000 loops, best of 3: 12.4 µs per loop

In [100]: %timeit np.einsum('ijk,i->k',bidule,betas)
100000 loops, best of 3: 4.05 µs per loop

In [101]: # Setup
     ...: N = 10000
     ...: bidule = np.random.rand(4,2,N)
     ...: betas = np.random.rand(4)
     ...: 

In [102]: %timeit np.dot(np.sum(bidule, axis=1).T, betas)
10000 loops, best of 3: 157 µs per loop

In [103]: %timeit np.einsum('ijk,i->k',bidule,betas)
10000 loops, best of 3: 90.9 µs per loop

因此,希望在多次迭代运行时,速度提升会更加明显。


好的,我会尽快尝试。您有任何加速Cython代码的想法吗? - floflo29
@floflo29 抱歉,我对Cython方面的东西不是很了解。 - Divakar
没问题,我也在CodeReview上复制了这个主题。 - floflo29
不要对CodeReview期望太高。大多数numpy专家都在这里晃荡。这里可能也有更多的cython知识。CR适用于确保您的Python代码符合PEP8标准,但并不适用于主要加速。此外,您没有提供完整的可测试脚本,只提供了一个函数。 - hpaulj
看看这里和CR上的cython标签。这里有2k个问题,那里只有30个! - hpaulj

1

这个答案很好地解释了为什么Numpy可能效率低下,但你仍然想使用Cython。基本上:

  • 小数组的开销(也包括减少小维度,比如np.sum(bidule, axis=1));
  • 由于中间过程而导致大数组的高速缓存抖动。

在这种情况下,为了从Cython中受益,您必须用普通的Python循环替换Numpy数组操作-Cython必须能够将其翻译成C代码,否则就没有意义。这并不意味着您必须重写所有Numpy函数,您必须要聪明一些。

例如,您应该摆脱mat_neighbidule数组,只需在循环中索引和求和即可。

另一方面,您应该保留(归一化的)prior 数组并继续使用 np.random.choice。没有真正简单的方法可以避免这种情况(好吧..参见 choice 的源代码)。不幸的是,这意味着这部分可能成为性能瓶颈。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接