加速Scipy中对两个不规则网格之间进行多次插值的griddata函数

31
我有几个值在同一个不规则网格 (x, y, z) 上定义,我想将它们插值到一个新网格 (x1, y1, z1) 上。也就是说,我有 f(x,y,z),g(x,y,z),h(x,y,z),我想计算 f(x1,y1,z1),g(x1,y1,z1),h(x1,y1,z1)
目前,我正在使用 scipy.interpolate.griddata 进行这个操作,并且效果很好。但是,因为我必须单独执行每个插值,并且有许多点,所以速度很慢,并且计算中有很多重复(例如查找最近的点、设置网格等)。
是否有一种方法可以加速计算并减少重复计算?例如定义两个网格,然后更改插值的值?

你使用的插值方法是什么,例如nearestlinear等?此外,你的不规则网格中有多少个点? - Jaime
我正在使用线性插值(最近邻插值不够好)。原始网格(x,y,z)由350万个点组成。新网格(x1,y1,z1)由约30万个点组成。在配备健康数量的RAM的i7处理器笔记本电脑上,线性插值需要约30秒的时间。我有6组要插值的值,所以这对我来说是一个主要瓶颈。 - s_haskey
4个回答

57

每次调用scipy.interpolate.griddata时,会发生几件事情:

  1. 首先,调用sp.spatial.qhull.Delaunay对不规则网格坐标进行三角剖分。
  2. 然后,对于新网格中的每个点,搜索三角剖分以找到它位于哪个三角形(实际上是哪个简单形状,在您的3D情况下将位于哪个四面体)中。
  3. 计算每个新网格点相对于外包简单形状顶点的重心坐标。
  4. 使用重心坐标和外包简单形状顶点处函数的值来为该网格点计算插值值。

前三个步骤对所有插值都是相同的,因此如果您可以为每个新网格点存储外包简单形状顶点的索引和插值的权重,那么您将大大减少计算量。但很遗憾,直接使用可用的功能难以实现这一点,虽然确实有可能实现:

import scipy.interpolate as spint
import scipy.spatial.qhull as qhull
import itertools

def interp_weights(xyz, uvw):
    tri = qhull.Delaunay(xyz)
    simplex = tri.find_simplex(uvw)
    vertices = np.take(tri.simplices, simplex, axis=0)
    temp = np.take(tri.transform, simplex, axis=0)
    delta = uvw - temp[:, d]
    bary = np.einsum('njk,nk->nj', temp[:, :d, :], delta)
    return vertices, np.hstack((bary, 1 - bary.sum(axis=1, keepdims=True)))

def interpolate(values, vtx, wts):
    return np.einsum('nj,nj->n', np.take(values, vtx), wts)

interp_weights函数执行了我上面列出的前三步的计算。然后interpolate函数使用这些计算出的值来快速执行第4步:

m, n, d = 3.5e4, 3e3, 3
# make sure no new grid point is extrapolated
bounding_cube = np.array(list(itertools.product([0, 1], repeat=d)))
xyz = np.vstack((bounding_cube,
                 np.random.rand(m - len(bounding_cube), d)))
f = np.random.rand(m)
g = np.random.rand(m)
uvw = np.random.rand(n, d)

In [2]: vtx, wts = interp_weights(xyz, uvw)

In [3]: np.allclose(interpolate(f, vtx, wts), spint.griddata(xyz, f, uvw))
Out[3]: True

In [4]: %timeit spint.griddata(xyz, f, uvw)
1 loops, best of 3: 2.81 s per loop

In [5]: %timeit interp_weights(xyz, uvw)
1 loops, best of 3: 2.79 s per loop

In [6]: %timeit interpolate(f, vtx, wts)
10000 loops, best of 3: 66.4 us per loop

In [7]: %timeit interpolate(g, vtx, wts)
10000 loops, best of 3: 67 us per loop

首先,它与griddata执行的相同,这很好。其次,设置插值,即计算vtxwts与调用griddata大致相同。但是第三个,现在您可以在同一网格上几乎不花时间地插值不同的值。

griddata没有考虑的唯一一件事是将fill_value分配给需要外推的点。您可以通过检查至少有一个权重为负的点来实现这一点,例如:

def interpolate(values, vtx, wts, fill_value=np.nan):
    ret = np.einsum('nj,nj->n', np.take(values, vtx), wts)
    ret[np.any(wts < 0, axis=1)] = fill_value
    return ret

3
太完美了,正是我想要的!非常感谢。如果未来的scipy版本中包含了这种功能,那就太好了。 - s_haskey
对我来说非常好用!在我的机器上多次运行时,它使用的内存也比scipy.interpolate.griddata少得多。 - Matthias123
1
另外,griddata 函数可以容纳函数中的缺失值/空洞 - nan,而这个解决方案无法处理? - FooBar
3
如何使用立方插值(在griddata中只是一个关键字)? - John Smith
1
我发现这种方法会计算出一些不合理的外推点值(在griddata中,外推点将被分配为nan)。我的解决方案是强制将外推点的权重设置为nan,如下所示:extra_idx = simplex == -1; weights = np.hstack((bary, 1 - bary.sum(axis=1, keepdims=True))); weights[extra_idx] = np.nan - Chun-Ye Lu
显示剩余2条评论

7
感谢Jaime提供的解决方案(尽管我并不完全理解重心计算是如何进行的...)
这里将展示一个从他的2D案例改编的示例:
import scipy.interpolate as spint
import scipy.spatial.qhull as qhull
import numpy as np

def interp_weights(xy, uv,d=2):
    tri = qhull.Delaunay(xy)
    simplex = tri.find_simplex(uv)
    vertices = np.take(tri.simplices, simplex, axis=0)
    temp = np.take(tri.transform, simplex, axis=0)
    delta = uv - temp[:, d]
    bary = np.einsum('njk,nk->nj', temp[:, :d, :], delta)
    return vertices, np.hstack((bary, 1 - bary.sum(axis=1, keepdims=True)))

def interpolate(values, vtx, wts):
    return np.einsum('nj,nj->n', np.take(values, vtx), wts)

m, n = 101,201
mi, ni = 1001,2001

[Y,X]=np.meshgrid(np.linspace(0,1,n),np.linspace(0,2,m))
[Yi,Xi]=np.meshgrid(np.linspace(0,1,ni),np.linspace(0,2,mi))

xy=np.zeros([X.shape[0]*X.shape[1],2])
xy[:,0]=Y.flatten()
xy[:,1]=X.flatten()
uv=np.zeros([Xi.shape[0]*Xi.shape[1],2])
uv[:,0]=Yi.flatten()
uv[:,1]=Xi.flatten()

values=np.cos(2*X)*np.cos(2*Y)

#Computed once and for all !
vtx, wts = interp_weights(xy, uv)
valuesi=interpolate(values.flatten(), vtx, wts)
valuesi=valuesi.reshape(Xi.shape[0],Xi.shape[1])
print "interpolation error: ",np.mean(valuesi-np.cos(2*Xi)*np.cos(2*Yi))  
print "interpolation uncertainty: ",np.std(valuesi-np.cos(2*Xi)*np.cos(2*Yi))  

可以使用图像映射等图像变换,而且速度得到了大幅提升。

由于每次迭代新的坐标会发生变化,因此不能使用相同的函数定义,但可以一次性计算三角剖分。

import scipy.interpolate as spint
import scipy.spatial.qhull as qhull
import numpy as np
import time

# Definition of the fast  interpolation process. May be the Tirangulation process can be removed !!
def interp_tri(xy):
    tri = qhull.Delaunay(xy)
    return tri


def interpolate(values, tri,uv,d=2):
    simplex = tri.find_simplex(uv)
    vertices = np.take(tri.simplices, simplex, axis=0)
    temp = np.take(tri.transform, simplex, axis=0)
    delta = uv- temp[:, d]
    bary = np.einsum('njk,nk->nj', temp[:, :d, :], delta)  
    return np.einsum('nj,nj->n', np.take(values, vertices),  np.hstack((bary, 1.0 - bary.sum(axis=1, keepdims=True))))

m, n = 101,201
mi, ni = 101,201

[Y,X]=np.meshgrid(np.linspace(0,1,n),np.linspace(0,2,m))
[Yi,Xi]=np.meshgrid(np.linspace(0,1,ni),np.linspace(0,2,mi))

xy=np.zeros([X.shape[0]*X.shape[1],2])
xy[:,1]=Y.flatten()
xy[:,0]=X.flatten()
uv=np.zeros([Xi.shape[0]*Xi.shape[1],2])
# creation of a displacement field
uv[:,1]=0.5*Yi.flatten()+0.4
uv[:,0]=1.5*Xi.flatten()-0.7
values=np.zeros_like(X)
values[50:70,90:150]=100.

#Computed once and for all !
tri = interp_tri(xy)
t0=time.time()
for i in range(0,100):
  values_interp_Qhull=interpolate(values.flatten(),tri,uv,2).reshape(Xi.shape[0],Xi.shape[1])
t_q=(time.time()-t0)/100

t0=time.time()
values_interp_griddata=spint.griddata(xy,values.flatten(),uv,fill_value=0).reshape(values.shape[0],values.shape[1])
t_g=time.time()-t0

print "Speed-up:", t_g/t_q
print "Mean error: ",(values_interp_Qhull-values_interp_griddata).mean()
print "Standard deviation: ",(values_interp_Qhull-values_interp_griddata).std()

在我的笔记本电脑上,加速的速度可以达到20到40倍!

希望这可以帮助到某些人


interp_weights 函数在这里失败,因为 d 超出了 temp 的范围,即 delta = uv - temp[:, d] - christopherlovell

3
我有同样的问题(griddata非常慢,对于许多插值结果网格保持不变),我最喜欢的解决方案是这里描述的,主要因为它非常容易理解和应用。
它使用LinearNDInterpolator,可以传递只需计算一次的Delaunay三角剖分。从那篇帖子中复制并粘贴(所有功劳都归xdze2):
from scipy.spatial import Delaunay
from scipy.interpolate import LinearNDInterpolator

tri = Delaunay(mesh1)  # Compute the triangulation

# Perform the interpolation with the given values:
interpolator = LinearNDInterpolator(tri, values_mesh1)
values_mesh2 = interpolator(mesh2)

这样可以将我的计算速度提升大约两倍。


0

你可以尝试使用Pandas,因为它提供了高性能的数据结构。

虽然插值方法是scipy插值的包装器,但也许通过改进的数据结构可以获得更好的速度。

import pandas as pd;
wp = pd.Panel(randn(2, 5, 4));
wp.interpolate();

interpolate() 使用不同的方法填充 Panel 数据集中的 NaN 值。希望它比 Scipy 更快。

如果它不起作用,有一种方法可以提高性能(而不是使用代码的并行版本):使用Cython并实现在 Python 代码内部使用的小例程。这里有一个关于此的示例。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接