开始方法
我们可以沿着第一个轴创建滑动窗口,然后使用wtd
值的范围进行张量乘法来进行求和约简。
实现大致如下 -
wtds = np.exp(-(np.arange(length) - m)**2/dss)
pnp_array3D = strided_axis0(pnp_array,len(wtds))
out = np.zeros(pnp_array.shape)
out[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]
out[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])
out /= wtds.sum()
获取滑动窗口的函数:strided_axis0
是来自这里
。
使用1D
卷积提高性能
那些与wtds
值相乘,然后进行求和约简的操作基本上是沿着第一个轴进行的卷积。因此,我们可以在axis=0
上使用scipy.ndimage.convolve1d
。由于内存效率很高,这样会快得多,因为我们不会创建巨大的滑动窗口。
实现方法如下:
from scipy.ndimage import convolve1d as conv
avgs = conv(pnp_array, weights=wtds/wtds.sum(),axis=0, mode='wrap')
因此,out[length-1:]
,即非零行与 avgs[:-length+1]
相同。
如果我们使用非常小的核数从 wtds
进行操作,则可能存在一些精度差异。因此,在使用此 convolution
方法时,请注意这一点。
运行时间测试
方法 -
def original_app(pnp_array, length, m, dss):
alma = np.zeros(pnp_array.shape)
wtd_sum = np.zeros(pnp_array.shape)
for l in range(len(pnp_array)):
if l >= asize:
for i in range(length):
im = i - m
wtd = np.exp( -(im * im) / dss)
alma[l] += pnp_array[l - length + i] * wtd
wtd_sum[l] += wtd
alma[l] = alma[l] / wtd_sum[l]
return alma
def vectorized_app1(pnp_array, length, m, dss):
wtds = np.exp(-(np.arange(length) - m)**2/dss)
pnp_array3D = strided_axis0(pnp_array,len(wtds))
out = np.zeros(pnp_array.shape)
out[length:] = np.tensordot(pnp_array3D,wtds,axes=((1),(0)))[:-1]
out[length-1] = wtds.dot(pnp_array[np.r_[-1,range(length-1)]])
out /= wtds.sum()
return out
def vectorized_app2(pnp_array, length, m, dss):
wtds = np.exp(-(np.arange(length) - m)**2/dss)
return conv(pnp_array, weights=wtds/wtds.sum(),axis=0, mode='wrap')
计时 -
In [470]: np.random.seed(0)
...: m,n = 1000,100
...: pnp_array = np.random.rand(m,n)
...:
...: length = 6
...: sigma = 0.3
...: offset = 0.5
...:
...: asize = length - 1
...: m = np.floor(offset * asize)
...: s = length / sigma
...: dss = 2 * s * s
...:
In [471]: %timeit original_app(pnp_array, length, m, dss)
...: %timeit vectorized_app1(pnp_array, length, m, dss)
...: %timeit vectorized_app2(pnp_array, length, m, dss)
...:
10 loops, best of 3: 36.1 ms per loop
1000 loops, best of 3: 1.84 ms per loop
1000 loops, best of 3: 684 µs per loop
In [472]: np.random.seed(0)
...: m,n = 10000,1000
In [473]: %timeit original_app(pnp_array, length, m, dss)
...: %timeit vectorized_app1(pnp_array, length, m, dss)
...: %timeit vectorized_app2(pnp_array, length, m, dss)
...:
1 loop, best of 3: 503 ms per loop
1 loop, best of 3: 222 ms per loop
10 loops, best of 3: 106 ms per loop
pnp_array
中的值相比如何。如果这些值要小得多,那么请使用 app#1。好的,它们看起来非常小,所以在这种情况下,请使用 app#1。这是我之前提到的同样的精度问题。 - Divakardef vectorized_app1(pnp_array, length, m, dss): wtds = np.exp( -(np.arange(length) - m)**2 / dss ) pnp_array3D = strided_axis0(pnp_array, len(wtds)) out = np.zeros(pnp_array.shape) out[length-1:] = np.tensordot( pnp_array3D , wtds , axes=( (1), (0) ) ) [:] out /= wtds.sum() return out
- Prokhozhii