Python 3D插值加速

8
我有如下代码用于插值3D体积数据。
Y, X, Z = np.shape(volume)
xs = np.arange(0, X)
ys = np.arange(0, Y)
zs = np.arange(0, Z)

points = list(zip(np.ravel(result[:, :, :, 1]), np.ravel(result[:, :, :, 0]), np.ravel(result[:, :, :, 2])))
interp = interpolate.RegularGridInterpolator((ys, xs, zs), volume,
                                             bounds_error=False, fill_value=0, method='linear')
new_volume = interp(points)
new_volume = np.reshape(new_volume, (Y, X, Z))

这段代码在512x512x110的体积上执行大约需要37秒(约2900万个点),每个体素花费超过1微秒的时间,这对我来说是无法接受的 - 而且它使用了4个核心。调用new_volume=interp(points)大约占据80%的处理时间,而列表创建则占据了几乎全部剩余的时间。
有没有简单(或更复杂)的方法可以使这个计算更快?或者有没有提供更快插值方法的好Python库?我的体积和点在每次调用此程序时都会改变。

我不确定你是否正确地进行了插值(不知道result是什么很难说)。对于这样大小的数组,37秒并不奇怪。通过使用np.c_[ ... ]而不是list(zip( ... ))可以大大加快points生成速度。请查看griddata,它甚至需要更长时间。如果您想在这样的数组中进行更快的插值,可能需要编写自己的CUDA代码并在GPU中加速。 - Imanol Luengo
@Imanol 感谢您提到 np.c 函数 - 我会尝试一下。result[:, :, :, 1] 是 Y 网格,result[:, :, :, 0] 是 X 网格,等等。关于 GPU 实现 - 我也会做这个,但是 - 使用 4 个核进行一个体素插值超过微秒的时间还是有点高。即使能够提高 3-5 倍的速度也将非常棒。 - Nefarin
如果您想在等分网格上进行插值,请尝试使用 scipy.ndimage.map_coordinates。可以在这里找到一个封装。 - Syrtis Major
2
我最近使用RegularGridInterpolator与MATLAB的griddedInterpolant进行比较,发现前者在处理数组体积(我使用的是4D向量值体积)时明显比后者慢。因此,底层实现可能存在问题。 - user664303
2个回答

5

这是稍微修改过的 cython 解决方案:

import numpy as np
cimport numpy as np
from libc.math cimport floor
from cython cimport boundscheck, wraparound, nonecheck, cdivision

DTYPE = np.float
ctypedef np.float_t DTYPE_t

@boundscheck(False)
@wraparound(False)
@nonecheck(False)
def interp3D(DTYPE_t[:,:,::1] v, DTYPE_t[:,:,::1] xs, DTYPE_t[:,:,::1] ys, DTYPE_t[:,:,::1] zs):

    cdef int X, Y, Z
    X,Y,Z = v.shape[0], v.shape[1], v.shape[2]
    cdef np.ndarray[DTYPE_t, ndim=3] interpolated = np.zeros((X, Y, Z), dtype=DTYPE)

    _interp3D(&v[0,0,0], &xs[0,0,0], &ys[0,0,0], &zs[0,0,0], &interpolated[0,0,0], X, Y, Z)
    return interpolated


@cdivision(True)
cdef inline void _interp3D(DTYPE_t *v, DTYPE_t *x_points, DTYPE_t *y_points, DTYPE_t *z_points, 
               DTYPE_t *result, int X, int Y, int Z):

    cdef:
        int i, x0, x1, y0, y1, z0, z1, dim
        DTYPE_t x, y, z, xd, yd, zd, c00, c01, c10, c11, c0, c1, c

    dim = X*Y*Z

    for i in range(dim):
        x = x_points[i]
        y = y_points[i]
        z = z_points[i]

        x0 = <int>floor(x)
        x1 = x0 + 1
        y0 = <int>floor(y)
        y1 = y0 + 1
        z0 = <int>floor(z)
        z1 = z0 + 1

        xd = (x-x0)/(x1-x0)
        yd = (y-y0)/(y1-y0)
        zd = (z-z0)/(z1-z0)

        if x0 >= 0 and y0 >= 0 and z0 >= 0:
            c00 = v[Y*Z*x0+Z*y0+z0]*(1-xd) + v[Y*Z*x1+Z*y0+z0]*xd
            c01 = v[Y*Z*x0+Z*y0+z1]*(1-xd) + v[Y*Z*x1+Z*y0+z1]*xd
            c10 = v[Y*Z*x0+Z*y1+z0]*(1-xd) + v[Y*Z*x1+Z*y1+z0]*xd
            c11 = v[Y*Z*x0+Z*y1+z1]*(1-xd) + v[Y*Z*x1+Z*y1+z1]*xd

            c0 = c00*(1-yd) + c10*yd
            c1 = c01*(1-yd) + c11*yd

            c = c0*(1-zd) + c1*zd

        else:
            c = 0

        result[i] = c 

结果仍与您的相同。使用一个随机网格数据60x60x60,我得到以下时间:
SciPy's solution:                982ms
Your cython solution:            24.7ms
Above modified cython solution:  8.17ms

所以,它比你的 cython 解决方案快近4倍。请注意:

  1. Cython默认在数组上执行边界检查,如果需要该功能,请删除@boundscheck(False)
  2. 在上述解决方案中,数组必须是C连续的。
  3. 如果您需要以上代码的并行变量,请在for循环中用prange替换range

希望这能有所帮助。


谢谢,确实更快了,但计算结果错误(原始解决方案/ scipy插值和此版本之间的绝对差异太大),并且在更大的网格(例如200x200x20)上崩溃。 - Nefarin
@Nefarin 代码中有几个错别字。我不得不将一个 + 改为 *,现在结果与你的相同。正如我之前提到的,确保传递给此函数的数组是“C连续”的。 - romeric
谢谢,现在它可以工作了,在释放GIL并使用prange后,速度甚至更快。 - Nefarin
我正在寻找一些用于插值函数f(x,y)的工具。这段代码片段是用来做这个的吗?我很难理解你在这里做了什么,以及你使用了哪种插值方法。 - ari

0

我使用了Cython来加速这个程序,并实现了以下代码:

import numpy as np
cimport numpy as np
np.import_array()
from libc.math cimport ceil, floor

DTYPE = np.float
ctypedef np.float_t DTYPE_t

def interp3(np.ndarray[DTYPE_t, ndim=3] x_grid, np.ndarray[DTYPE_t, ndim=3] y_grid,
    np.ndarray[DTYPE_t, ndim=3] z_grid, np.ndarray[DTYPE_t, ndim=3] v,
    np.ndarray[DTYPE_t, ndim=3] xs, np.ndarray[DTYPE_t, ndim=3] ys, 
    np.ndarray[DTYPE_t, ndim=3] zs):

    cdef int i
    cdef float x
    cdef float y
    cdef float z
    cdef int x0
    cdef int x1
    cdef int y0
    cdef int y1
    cdef int z0
    cdef int z1
    cdef float xd
    cdef float yd
    cdef float zd
    cdef float c00
    cdef float c01
    cdef float c10
    cdef float c11
    cdef float c0
    cdef float c1
    cdef float c
    cdef int X
    cdef int Y
    cdef int Z

    X, Y, Z = np.shape(x_grid)

    cdef np.ndarray[DTYPE_t, ndim=1] x_points = np.ravel(xs)
    cdef np.ndarray[DTYPE_t, ndim=1] y_points = np.ravel(ys)
    cdef np.ndarray[DTYPE_t, ndim=1] z_points = np.ravel(zs)
    cdef np.ndarray[DTYPE_t, ndim=1] result = np.empty((len(x_points)), dtype=DTYPE)

    for i in range(len(x_points)):
        x = x_points[i]
        y = y_points[i]
        z = z_points[i]

        x0 = int(floor(x))
        x1 = x0 + 1
        y0 = int(floor(y))
        y1 = y0 + 1
        z0 = int(floor(z))
        z1 = z0 + 1

        xd = (x-x0)/(x1-x0)
        yd = (y-y0)/(y1-y0)
        zd = (z-z0)/(z1-z0)

        try:
            assert x0 >= 0 and y0 >= 0 and z0 >= 0
            c00 = v[x0, y0, z0]*(1-xd) + v[x1, y0, z0]*xd
            c01 = v[x0, y0, z1]*(1-xd) + v[x1, y0, z1]*xd
            c10 = v[x0, y1, z0]*(1-xd) + v[x1, y1, z0]*xd
            c11 = v[x0, y1, z1]*(1-xd) + v[x1, y1, z1]*xd

            c0 = c00*(1-yd) + c10*yd
            c1 = c01*(1-yd) + c11*yd

            c = c0*(1-zd) + c1*zd
        except:
            c = 0

        result[i] = c

    cdef np.ndarray[DTYPE_t, ndim=3] interpolated = np.zeros((X, Y, Z), dtype=DTYPE)
    interpolated = np.reshape(result, (X, Y, Z))
    return interpolated  

这是我第一次使用Cython,所以有以下问题:

  1. 如何进一步优化代码?

  2. 是否有简单的方法避免使用try和assert语句来检查数组边界?尝试使用min/max组合来确保边界比使用try/assert方法更慢。

目前,它比上面发布的原始代码快8倍左右。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接