Numpy矢量化滑动窗口操作

8

我有以下的numpy数组:

arr_1 = [[1,2],[3,4],[5,6]]   # 3 X 2 
arr_2 = [[0.5,0.6],[0.7,0.8],[0.9,1.0],[1.1,1.2],[1.3,1.4]]  # 5 X 2

arr_1 明显是个 3 X 2 的数组,而 arr_2 则是一个 5 X 2 的数组。

现在我想要对 arr_2 应用滑动窗口技术(窗口大小为 3),而不使用循环的方式对 arr_1 和 arr_2 进行逐元素相乘。

Example:

Multiplication 1:  np.multiply(arr_1,arr_2[:3,:])

Multiplication 2: np.multiply(arr_1,arr_2[1:4,:])

Multiplication 3: np.multiply(arr_1,arr_2[2:5,:])

我希望以某种矩阵乘法形式来实现此操作,以使其比当前的解决方案更快。当前的解决方案形式如下:

for i in (2):
   np.multiply(arr_1,arr_2[i:i+3,:])  

如果arr_2中的行数很大(数量级达到成千上万),那么这个解决方案并不具有良好的可伸缩性。

非常感谢任何帮助。

2个回答

7
我们可以使用NumPy广播以向量化的方式创建这些滑动窗口索引。然后,我们可以使用这些索引直接访问arr_2,创建一个3D数组,并与2D数组arr_1执行逐元素乘法,这将再次带来广播
因此,我们将有一个如下的向量化实现 -
W = arr_1.shape[0] # Window size
idx = np.arange(arr_2.shape[0]-W+1)[:,None] + np.arange(W)
out = arr_1*arr_2[idx]

运行时测试和验证结果 -

In [143]: # Input arrays
     ...: arr_1 = np.random.rand(3,2)
     ...: arr_2 = np.random.rand(10000,2)
     ...: 
     ...: def org_app(arr_1,arr_2):
     ...:     W = arr_1.shape[0] # Window size
     ...:     L = arr_2.shape[0]-W+1
     ...:     out = np.empty((L,W,arr_1.shape[1]))
     ...:     for i in range(L):
     ...:        out[i] = np.multiply(arr_1,arr_2[i:i+W,:])
     ...:     return out
     ...: 
     ...: def vectorized_app(arr_1,arr_2):
     ...:     W = arr_1.shape[0] # Window size
     ...:     idx = np.arange(arr_2.shape[0]-W+1)[:,None] + np.arange(W)
     ...:     return arr_1*arr_2[idx]
     ...: 

In [144]: np.allclose(org_app(arr_1,arr_2),vectorized_app(arr_1,arr_2))
Out[144]: True

In [145]: %timeit org_app(arr_1,arr_2)
10 loops, best of 3: 47.3 ms per loop

In [146]: %timeit vectorized_app(arr_1,arr_2)
1000 loops, best of 3: 1.21 ms per loop

1
您真是个大神!这种广播概念简直让我大开眼界!谢谢! - Nikhil

4

这是一个很好的案例,用于测试as_strided和Divakar的广播速度。

In [281]: %%timeit 
     ...: out=np.empty((L,W,arr1.shape[1]))
     ...: for i in range(L):
     ...:    out[i]=np.multiply(arr1,arr2[i:i+W,:])
     ...: 
10 loops, best of 3: 48.9 ms per loop
In [282]: %%timeit
     ...: idx=np.arange(L)[:,None]+np.arange(W)
     ...: out=arr1*arr2[idx]
     ...: 
100 loops, best of 3: 2.18 ms per loop
In [283]: %%timeit
     ...: arr3=as_strided(arr2, shape=(L,W,2), strides=(16,16,8))
     ...: out=arr1*arr3
     ...: 
1000 loops, best of 3: 805 µs per loop

创建Numpy数组而不枚举数组,更多比较这些方法的内容,请参阅。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接