快速插值三维数组

12

我有一个三维数组,需要在其中的一个轴上进行插值(最后一个维度)。假设 y.shape = (nx, ny, nz),我想在每个(nx, ny)中对nz进行插值。然而,我希望为每个[i,j]插值到不同的值。

这是一些示例代码。如果我想要插值到单个值,比如说new_z,我可以像下面这样使用scipy.interpolate.interp1d

# y is a 3D ndarray
# x is a 1D ndarray with the abcissa values
# new_z is a number
f = scipy.interpolate.interp1d(x, y, axis=-1, kind='linear')
result = f(new_z)

然而,对于这个问题,我实际上想要做的是针对每个y[i, j]插值到不同的new_z。所以我这样做:

# y is a 3D ndarray
# x is a 1D ndarray with the abcissa values
# new_z is a 2D array
result = numpy.empty(y.shape[:-1])
for i in range(nx):
    for j in range(ny):
        f = scipy.interpolate.interp1d(x, y[i, j], axis=-1, kind='linear')
        result[i, j] = f(new_z[i, j])

很不幸,使用多个循环会变得低效和缓慢。有没有更好的方法来进行这种插值?线性插值就足够了。一种可能的方法是在Cython中实现它,但我想避免这样做,因为我想要灵活地改变到三次插值,并且不想在Cython中手动完成。

6个回答

8
为了加快高阶插值速度,您可以只调用一次interp1d(),然后使用_fitpack模块中的_spline属性和低级函数_bspleval()。以下是代码:
from scipy.interpolate import interp1d
import numpy as np

nx, ny, nz = 30, 40, 50
x = np.arange(0, nz, 1.0)
y = np.random.randn(nx, ny, nz)
new_x = np.random.random_integers(1, (nz-1)*10, size=(nx, ny))/10.0

def original_interpolation(x, y, new_x):
    result = np.empty(y.shape[:-1])
    for i in xrange(nx):
        for j in xrange(ny):
            f = interp1d(x, y[i, j], axis=-1, kind=3)
            result[i, j] = f(new_x[i, j])
    return result

def fast_interpolation(x, y, new_x):
    from scipy.interpolate._fitpack import _bspleval
    f = interp1d(x, y, axis=-1, kind=3)
    xj,cvals,k = f._spline
    result = np.empty_like(new_x)
    for (i, j), value in np.ndenumerate(new_x):
        result[i, j] = _bspleval(value, x, cvals[:, i, j], k, 0)
    return result

r1 = original_interpolation(x, y, new_x)
r2 = fast_interpolation(x, y, new_x)

>>> np.allclose(r1, r2)
True

%timeit original_interpolation(x, y, new_x)
%timeit fast_interpolation(x, y, new_x)
1 loops, best of 3: 3.78 s per loop
100 loops, best of 3: 15.4 ms per loop

谢谢。您的解决方案也非常有意思。我惊讶于有这么多好答案。不幸的是,我只能接受一个。虽然您的解决方案没有Cython或@pv的方案速度快,但它更适合问题的框架,并且在插值类型方面最灵活。因此我接受了它。 - tiago
我正在尝试运行这段代码,但是出现了错误“BSpline”对象不可迭代。 - Delosari

4

我认为interp1d没有一种快速执行此操作的方法,因此您无法避免循环。

使用np.searchsorted编写线性插值,您可能仍然可以避免Cython,类似于以下内容(未经测试):

def interp3d(x, y, new_x):
    assert x.ndim == 1 and y.ndim == 3 and new_x.ndim == 2
    assert y.shape[:2] == new_x.shape and x.shape == y.shape[2:]

    nx, ny = y.shape[:2]
    new_x = new_x.ravel()
    j = np.arange(len(new_x))
    k = np.searchsorted(x, new_x).clip(1, len(x) - 1)
    y = y.reshape(-1, x.shape[0])
    p = (new_x - x[k-1]) / (x[k] - x[k-1])
    result = (1 - p) * y[j,k-1] + p * y[j,k]
    return result.reshape(nx, ny)

不过这对立方插值没有帮助。

编辑:将其制作成一个函数并修复了一些偏移错误。一些与Cython的时间比较(500x500x500网格):

In [58]: %timeit interp3d(x, y, new_x)
10 loops, best of 3: 82.7 ms per loop

In [59]: %timeit cyfile.interp3d(x, y, new_x)
10 loops, best of 3: 86.3 ms per loop

In [60]: abs(interp3d(x, y, new_x) - cyfile.interp3d(x, y, new_x)).max()
Out[60]: 2.2204460492503131e-16

虽然可以说Cython代码更易于阅读。

谢谢,使用numpy确实是一种优雅的方式。我最终选择了一个快速的Cython解决方案(请参见我的答案)。编写Cython比等待Python版本运行更快。 - tiago
@pv. 你可以通过将插值作为向量操作执行来避免循环。 - Henry Gomersall
@HenryGomersall:是的(我在上面的代码中试过了),但我的意思是,如果你想坚持使用interp1d,那么这是不可能的。 - pv.

4

由于上面的numpy建议时间过长,我等不及了,所以在此提供cython版本以备将来参考。根据一些松散的基准测试,它大约快了3000倍(当然,它只是线性插值,并且不像interp1d那样做得多,但对于这个目的来说还可以)。

import numpy as N
cimport numpy as N
cimport cython

DTYPEf = N.float64
ctypedef N.float64_t DTYPEf_t

@cython.boundscheck(False) # turn of bounds-checking for entire function
@cython.wraparound(False)  # turn of bounds-checking for entire function
cpdef interp3d(N.ndarray[DTYPEf_t, ndim=1] x, N.ndarray[DTYPEf_t, ndim=3] y,
               N.ndarray[DTYPEf_t, ndim=2] new_x):
    """
    interp3d(x, y, new_x)

    Performs linear interpolation over the last dimension of a 3D array,
    according to new values from a 2D array new_x. Thus, interpolate
    y[i, j, :] for new_x[i, j].

    Parameters
    ----------
    x : 1-D ndarray (double type)
        Array containg the x (abcissa) values. Must be monotonically
        increasing.
    y : 3-D ndarray (double type)
        Array containing the y values to interpolate.
    x_new: 2-D ndarray (double type)
        Array with new abcissas to interpolate.

    Returns
    -------
    new_y : 3-D ndarray
        Interpolated values.
    """
    cdef int nx = y.shape[0]
    cdef int ny = y.shape[1]
    cdef int nz = y.shape[2]
    cdef int i, j, k
    cdef N.ndarray[DTYPEf_t, ndim=2] new_y = N.zeros((nx, ny), dtype=DTYPEf)

    for i in range(nx):
        for j in range(ny):
            for k in range(1, nz):
                 if x[k] > new_x[i, j]:
                     new_y[i, j] = (y[i, j, k] - y[i, j, k - 1]) * \
                  (new_x[i, j] - x[k-1]) / (x[k] - x[k - 1]) + y[i, j, k - 1]
                     break
    return new_y

3

@pv 的答案 基础上进行优化,将内部循环向量化,以下代码可以显著提高速度(编辑:将昂贵的 numpy.tile 替换为使用 numpy.lib.stride_tricks.as_strided):

import numpy
from scipy import interpolate

nx = 30
ny = 40
nz = 50

y = numpy.random.randn(nx, ny, nz)
x = numpy.float64(numpy.arange(0, nz))

# We select some locations in the range [0.1, nz-0.1]
new_z = numpy.random.random_integers(1, (nz-1)*10, size=(nx, ny))/10.0

# y is a 3D ndarray
# x is a 1D ndarray with the abcissa values
# new_z is a 2D array

def original_interpolation():
    result = numpy.empty(y.shape[:-1])
    for i in range(nx):
        for j in range(ny):
            f = interpolate.interp1d(x, y[i, j], axis=-1, kind='linear')
            result[i, j] = f(new_z[i, j])

    return result

grid_x, grid_y = numpy.mgrid[0:nx, 0:ny]
def faster_interpolation():
    flat_new_z = new_z.ravel()
    k = numpy.searchsorted(x, flat_new_z)
    k = k.reshape(nx, ny)

    lower_index = [grid_x, grid_y, k-1]
    upper_index = [grid_x, grid_y, k]

    tiled_x = numpy.lib.stride_tricks.as_strided(x, shape=(nx, ny, nz), 
        strides=(0, 0, x.itemsize))

    z_upper = tiled_x[upper_index]
    z_lower = tiled_x[lower_index]

    z_step = z_upper - z_lower
    z_delta = new_z - z_lower

    y_lower = y[lower_index]
    result = y_lower + z_delta * (y[upper_index] - y_lower)/z_step

    return result

# both should be the same (giving a small difference)
print numpy.max(
        numpy.abs(original_interpolation() - faster_interpolation()))

这在我的机器上产生了以下时间:
In [8]: timeit foo.original_interpolation()
10 loops, best of 3: 102 ms per loop

In [9]: timeit foo.faster_interpolation()
1000 loops, best of 3: 564 us per loop

nx = 300ny = 300nz = 500应用于IT技术领域,可以获得130倍的加速效果:

In [2]: timeit original_interpolation()
1 loops, best of 3: 8.27 s per loop

In [3]: timeit faster_interpolation()
10 loops, best of 3: 60.1 ms per loop

你需要编写自己的算法来进行三次插值,但这并不难。


2
你可以使用map_coordinates来实现这个功能,具体请参考文档:map_coordinates
from numpy import random, meshgrid, arange
from scipy.ndimage import map_coordinates

(nx, ny, nz) = (4, 5, 6)
# some random array
A = random.rand(nx, ny, nz)

# random floating-point indices in [0, nz-1]
Z = random.rand(nx, ny)*(nz-1)

# regular integer indices of shape (nx,ny)
X, Y = meshgrid(arange(nx), arange(ny), indexing='ij')

coords = (X, Y, Z) # X, Y, and Z are of shape (nx, ny)

print map_coordinates(A, coords, order=1, cval=-999.)   

我考虑过使用map_coordinates,谢谢你的建议。在我的情况下,nx, ny, nz每个都接近500,所以我认为map_coordinates可能会在RAM上有点贪婪。我将运行一些基准测试并报告。 - tiago

2

虽然有几个不错的答案,但它们仍在固定长度为500的数组中进行250k次插值:

j250k = np.searchsorted( X500, X250k )  # indices in [0, 500)

这可以通过使用一个包含5k个插槽的LUT(查找表)来加快速度:

lut = np.interp( np.arange(5000), X500, np.arange(500) ).round().astype(int)
xscale = (X - X.min()) * (5000 - 1) \
        / (X.max() - X.min()) 
j = lut.take( xscale.astype(int), mode="clip" )  # take(floats) in numpy 1.7 ?

#---------------------------------------------------------------------------
# X     |    |       | |             |
# j     0    1       2 3             4 ...
# LUT   |....|.......|.|.............|....  -> int j (+ offset in [0, 1) )
#---------------------------------------------------------------------------

searchsorted很快,时间约为ln2 500,因此这可能并不比它快太多。
但是在C中,LUT非常快,这是一个简单的速度/内存权衡。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接