在Numpy / Scipy中实现快速沿路径线性插值

24

假设我有来自山上三个(已知)高度的气象站数据。具体而言,每个站每分钟记录其位置的温度测量值。我想执行两种插值,并希望能够快速执行每种插值。

因此,让我们设置一些数据:

import numpy as np
from scipy.interpolate import interp1d
import pandas as pd
import seaborn as sns

np.random.seed(0)
N, sigma = 1000., 5

basetemps = 70 + (np.random.randn(N) * sigma)
midtemps = 50 + (np.random.randn(N) * sigma)
toptemps = 40 + (np.random.randn(N) * sigma)
alltemps = np.array([basetemps, midtemps, toptemps]).T # note transpose!
trend = np.sin(4 / N * np.arange(N)) * 30
trend = trend[:, np.newaxis]

altitudes = np.array([500, 1500, 4000]).astype(float)

finaltemps = pd.DataFrame(alltemps + trend, columns=altitudes)
finaltemps.index.names, finaltemps.columns.names = ['Time'], ['Altitude']
finaltemps.plot()

好的,那么我们的温度看起来是这样的:原始温度数据

将所有时间插值到相同的高度:

我认为这个很简单。假设我想要获取每个时间在1,000海拔处的温度。我可以使用内置的scipy插值方法:

interping_function = interp1d(altitudes, finaltemps.values)
interped_to_1000 = interping_function(1000)

fig, ax = plt.subplots(1, 1, figsize=(8, 5))
finaltemps.plot(ax=ax, alpha=0.15)
ax.plot(interped_to_1000, label='Interped')
ax.legend(loc='best', title=finaltemps.columns.name)

静态插值温度

这很好用。现在让我们来看看速度:

%%timeit
res = interp1d(altitudes, finaltemps.values)(1000)
#-> 1000 loops, best of 3: 207 µs per loop

沿路径插值:

现在我有一个相关的第二个问题。假设我知道徒步旅行队的海拔随时间变化的情况,并且我想通过在时间上线性插值我的数据来计算他们(移动中)位置的温度。特别地,我知道徒步旅行队的位置的时间与我知道天气站温度的时间是相同的。我可以不费吹灰之力地做到这一点:

location = np.linspace(altitudes[0], altitudes[-1], N)
interped_along_path = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc) 
                                             for i, loc in enumerate(location)])

fig, ax = plt.subplots(1, 1, figsize=(8, 5))
finaltemps.plot(ax=ax, alpha=0.15)
ax.plot(interped_along_path, label='Interped')
ax.legend(loc='best', title=finaltemps.columns.name)

移动插值温度

这很好地解决了问题,但需要注意的是上面的关键行使用列表推导式来隐藏大量的工作。在之前的情况下,scipy为我们创建了一个单一的插值函数,并在大量数据上评估它一次。而在这种情况下,scipy实际上正在构建N个单独的插值函数,并对少量数据进行每次评估。这感觉本质上是低效的。这里潜藏着一个for循环(在列表推导式中),而且这感觉有些臃肿。

毫不奇怪,这比之前的情况要慢得多:

%%timeit
res = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc) 
                            for i, loc in enumerate(location)])
#-> 10 loops, best of 3: 145 ms per loop

因此,第二个示例的运行速度比第一个慢了1,000倍。也就是说,这符合“制作线性插值函数”步骤需要的时间在第二个示例中发生了1,000次,而在第一个示例中只发生了一次。
那么问题来了:有没有更好的方法来解决第二个问题?例如,是否有一种好的方法可以使用二维插值(也许可以处理当徒步旅行队位置已知的时间并不是温度被采样的时间的情况)?或者是否有一种特别巧妙的方法来处理时间对齐的情况?还是其他方法?

4
现在这才是如何写一个问题的正确方式! - YXD
2
谢谢!现在你可以向我展示如何写出一篇杀手级别的答案了! :) - 8one6
3个回答

11

在点 xi 处,基于位置 x1x2 的两个值 y1y2 的线性插值公式如下:

yi = y1 + (y2-y1) * (xi-x1) / (x2-x1)

使用一些向量化的Numpy表达式,我们可以从数据集中选择相关点并应用上述函数:

I = np.searchsorted(altitudes, location)

x1 = altitudes[I-1]
x2 = altitudes[I]

time = np.arange(len(alltemps))
y1 = alltemps[time,I-1]
y2 = alltemps[time,I]

xI = location

yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)

问题在于一些点位于已知范围的边界上(甚至超出范围),这些应该被考虑在内:

I = np.searchsorted(altitudes, location)
same = (location == altitudes.take(I, mode='clip'))
out_of_range = ~same & ((I == 0) | (I == altitudes.size))
I[out_of_range] = 1  # Prevent index-errors

x1 = altitudes[I-1]
x2 = altitudes[I]

time = np.arange(len(alltemps))
y1 = alltemps[time,I-1]
y2 = alltemps[time,I]

xI = location

yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)
yI[out_of_range] = np.nan

幸运的是,Scipy已经提供了ND插值,它同样易于处理时间不匹配的问题,例如:
from scipy.interpolate import interpn

time = np.arange(len(alltemps))

M = 150
hiketime = np.linspace(time[0], time[-1], M)
location = np.linspace(altitudes[0], altitudes[-1], M)
xI = np.column_stack((hiketime, location))

yI = interpn((time, altitudes), alltemps, xI)

下面是一个基准测试代码(实际上没有使用任何pandas,但我包含了其他答案中的解决方案):
import numpy as np
from scipy.interpolate import interp1d, interpn

def original():
    return np.array([interp1d(altitudes, alltemps[i, :])(loc)
                                for i, loc in enumerate(location)])

def OP_self_answer():
    return np.diagonal(interp1d(altitudes, alltemps)(location))

def interp_checked():
    I = np.searchsorted(altitudes, location)
    same = (location == altitudes.take(I, mode='clip'))
    out_of_range = ~same & ((I == 0) | (I == altitudes.size))
    I[out_of_range] = 1  # Prevent index-errors

    x1 = altitudes[I-1]
    x2 = altitudes[I]

    time = np.arange(len(alltemps))
    y1 = alltemps[time,I-1]
    y2 = alltemps[time,I]

    xI = location

    yI = y1 + (y2-y1) * (xI-x1) / (x2-x1)
    yI[out_of_range] = np.nan

    return yI

def scipy_interpn():
    time = np.arange(len(alltemps))
    xI = np.column_stack((time, location))
    yI = interpn((time, altitudes), alltemps, xI)
    return yI

N, sigma = 1000., 5

basetemps = 70 + (np.random.randn(N) * sigma)
midtemps = 50 + (np.random.randn(N) * sigma)
toptemps = 40 + (np.random.randn(N) * sigma)
trend = np.sin(4 / N * np.arange(N)) * 30
trend = trend[:, np.newaxis]
alltemps = np.array([basetemps, midtemps, toptemps]).T + trend
altitudes = np.array([500, 1500, 4000], dtype=float)
location = np.linspace(altitudes[0], altitudes[-1], N)

funcs = [original, interp_checked, scipy_interpn]
for func in funcs:
    print(func.func_name)
    %timeit func()

from itertools import combinations
outs = [func() for func in funcs]
print('Output allclose:')
print([np.allclose(out1, out2) for out1, out2 in combinations(outs, 2)])

在我的系统上出现了以下结果:

original
10 loops, best of 3: 184 ms per loop
OP_self_answer
10 loops, best of 3: 89.3 ms per loop
interp_checked
1000 loops, best of 3: 224 µs per loop
scipy_interpn
1000 loops, best of 3: 1.36 ms per loop
Output allclose:
[True, True, True, True, True, True]

与最快的方法相比,Scipy的interpn在速度方面略显不足,但由于其通用性和易用性,它绝对是一个不错的选择。


我希望这可以成为一个关于最佳实践的开放式对话。为此,您能添加一些时间数据吗?特别是,您能测量我在问题中描述的方法(以及我的建议答案),以及您上面提出的方法,以便每个人都可以看到相对速度吗? - 8one6
@8one6 - 你说得对,那肯定很好包含。你是否也希望将函数写成更通用的形式?在我看来,基本思路现在应该相当明显了。 - user2379410
我认为你写的很合理。我很惊讶你的代码只显示了“原始”和“OP_self_answer”之间2倍的差异,而在我的机器上,这两个函数的执行时间似乎相差10倍。我不知道为什么会这样。 - 8one6
@8one6 - 我不指望我的8年老笔记本电脑能代表性 :) 这可能是因为CPU缓存太小或RAM太慢之类的原因;我重新运行了测试,结果相同。如果您愿意,可以编辑自己的时间,脚本应该可以直接运行。 - user2379410

7
对于一个固定的时间点,您可以使用以下插值函数:
g(a) = cc[0]*abs(a-aa[0]) + cc[1]*abs(a-aa[1]) + cc[2]*abs(a-aa[2])

其中a是徒步旅行者的海拔高度,aa是带有3个测量高度的向量,cc是带有系数的向量。需要注意以下三点:

  1. 对于给定与aa相应的温度(alltemps),可以通过使用np.linalg.solve()解决线性矩阵方程来确定cc
  2. g(a)易于对(N,)维度的a和(N, 3)维度的cc进行向量化(分别包括np.linalg.solve())。
  3. g(a)被称为一阶单变量样条核函数(对于三个点)。使用abs(a-aa[i])**(2*d-1)会将样条阶数更改为d。这种方法可以被解释为机器学习中高斯过程的简化版本。

因此代码如下:

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

# generate temperatures
np.random.seed(0)
N, sigma = 1000, 5
trend = np.sin(4 / N * np.arange(N)) * 30
alltemps = np.array([tmp0 + trend + sigma*np.random.randn(N)
                     for tmp0 in [70, 50, 40]])

# generate attitudes:
altitudes = np.array([500, 1500, 4000]).astype(float)
location = np.linspace(altitudes[0], altitudes[-1], N)


def doit():
    """ do the interpolation, improved version for speed """
    AA = np.vstack([np.abs(altitudes-a_i) for a_i in altitudes])
    # This is slighty faster than np.linalg.solve(), because AA is small:
    cc = np.dot(np.linalg.inv(AA), alltemps)

    return (cc[0]*np.abs(location-altitudes[0]) +
            cc[1]*np.abs(location-altitudes[1]) +
            cc[2]*np.abs(location-altitudes[2]))


t_loc = doit()  # call interpolator

# do the plotting:
fg, ax = plt.subplots(num=1)
for alt, t in zip(altitudes, alltemps):
    ax.plot(t, label="%d feet" % alt, alpha=.5)
ax.plot(t_loc, label="Interpolation")
ax.legend(loc="best", title="Altitude:")
ax.set_xlabel("Time")
ax.set_ylabel("Temperature")
fg.canvas.draw()

测量时间为:

In [2]: %timeit doit()
10000 loops, best of 3: 107 µs per loop

更新: 我用列表推导式替换了doit()中的原始代码,将速度提高了30%(对于N=1000)。

此外,根据要求进行比较,在我的机器上,@moarningsun的基准代码块:

10 loops, best of 3: 110 ms per loop  
interp_checked
10000 loops, best of 3: 83.9 µs per loop
scipy_interpn
1000 loops, best of 3: 678 µs per loop
Output allclose:
[True, True, True]

请注意,N=1000 是一个相对较小的数字。使用 N=100000 会产生以下结果:
interp_checked
100 loops, best of 3: 8.37 ms per loop

%timeit doit()
100 loops, best of 3: 5.31 ms per loop

这表明相比于“interp_checked”方法,“这种方法”在处理大规模的N时更具可扩展性。

这是一个相当新颖的解决方案。为了比较起见,您能否在同一台机器上展示其他提出的替代方案的类似执行时间结果? - 8one6
@8one6 进行了基准测试并删除了一些列表推导式以提高速度。 - Dietrich
请使用 return np.einsum('ij,ij->j', cc, np.abs(location - altitudes.reshape(-1,1))) 作为你的返回行。你还应该矢量化你的AA构建,以防止形成太多的中间变量。 - Daniel
谢谢你的提示。我尝试了你的“einsum()”代码行 - 有趣的是它比“N=100000”时慢了(6毫秒对4.25毫秒)。不太确定为什么。我尝试使用“np.vectorize()”,但没有成功生成可运行的代码。由于“AA”只有维度(3,3),我不确定并行化速度增益是否超过调用“np.vectorize()”的开销。 - Dietrich

1
我将提供一点进展。在第二种情况下(沿着路径插值),我们正在制作许多不同的插值函数。我们可以尝试制作只有一个插值函数(它在高度维度上对所有时间进行插值,就像上面第一种情况中那样),然后一次又一次地评估该函数(以矢量化方式)。这将给我们带来比我们想要的数据更多的数据(它将给我们一个 1,000 x 1,000 矩阵,而不是一个 1,000 元素向量)。但是,我们的目标结果只是沿着对角线。所以问题是,调用单个函数运行比使用简单参数制作许多函数并调用它们快吗?
答案是肯定的!
关键是由scipy.interpolate.interp1d返回的插值函数能够接受numpy.ndarray作为其输入。因此,您可以通过提供矢量输入以C速度有效地多次调用插值函数。即,这比编写一个for循环并在标量输入上一遍又一遍地调用插值函数要快得多。因此,虽然我们计算了许多我们最终扔掉的数据点,但我们通过不构造许多不常用的插值函数节省了更多时间。
old_way = interped_along_path = np.array([interp1d(altitudes, finaltemps.values[i, :])(loc) 
                                                      for i, loc in enumerate(location)])
# look ma, no for loops!
new_way = np.diagonal(interp1d(altitudes, finaltemps.values)(location)) 
# note, `location` is a vector!
abs(old_way - new_way).max()
#-> 0.0

然而:

%%timeit
res = np.diagonal(interp1d(altitudes, finaltemps.values)(location))
#-> 100 loops, best of 3: 16.7 ms per loop

所以这种方法让我们提高了10倍!有人能做得更好吗?或者提出完全不同的方法?


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接