根据@DSM的评论,我认为你的应该改为。如果是这样的话,你的循环版本是否像下面这样?
嵌套循环版本
import numpy as np
N = 5
M = 2
A = np.zeros((M,N))
B = np.random.randint(M, size=N)
C = np.random.rand(N,N)
for i in range(M):
for j in range(N):
for k in range(N):
if B[k] == i:
A[i,j] += C[j,k]
有多种方法可以将这个问题向量化。我将展示我的思考过程,但还有更有效的方法(例如@DSM的版本,在问题中识别出矩阵乘法)。
为了解释起见,以下是一种方法的演示。
向量化内部循环
让我们从重写内部k
循环开始:
for i in range(M):
for j in range(N):
A[i,j] = C[j, B == i].sum()
这句话可以更简单地表述为C[j][B == i].sum()
。我们只需要选择C
的第j
行,在该行中选择仅当B
等于i
的元素,并将它们相加。
向量化最外层循环
接下来,让我们分解外部的i
循环。现在我们将会到达一个可读性会受到影响的地方,不幸地...
i = np.arange(M)[:,np.newaxis]
mask = (B == i).astype(int)
for j in range(N):
A[:,j] = (C[j] * mask).sum(axis=-1)
这里有几种技巧。在这种情况下,我们正在遍历
A
的列。每个
A
的列是对应行的子集的总和
C
。
B
等于行索引
i
的位置,决定了
C
行的子集。
为了避免遍历 i
,我们通过向 i
添加一个新轴来创建一个 2D 数组,使得 B == i
。(如果您对此感到困惑,请查看numpy广播的文档。)换句话说:
B:
array([1, 1, 1, 1, 0])
i:
array([[0],
[1]])
B == i:
array([[False, False, False, False, True],
[ True, True, True, True, False]], dtype=bool)
我们想要做的是对
C[j]
进行两个(
M
)筛选和求和,一个用于
B == i
中的每一行。这将给我们一个由两个元素组成的向量,对应于
A
中的第
j
列。
我们不能直接通过索引
C
来实现这一点,因为结果的形状将不会保持不变,因为每一行可能具有不同数量的元素。我们将通过将
B == i
掩码乘以当前的
C
行来解决这个问题,从而在
B == i
为
False
的位置得到零,并在其为真时得到
C
当前行中的值。
为了做到这一点,我们需要将布尔数组
B == i
转换为整数:
mask = (B == i).astype(int):
array([[0, 0, 0, 0, 1],
[1, 1, 1, 1, 0]])
所以当我们将其与当前行的
C
相乘时:
C[j]:
array([ 0.19844887, 0.44858679, 0.35370919, 0.84074259, 0.74513377])
C[j] * mask:
array([[ 0. , 0. , 0. , 0. , 0.74513377],
[ 0.19844887, 0.44858679, 0.35370919, 0.84074259, 0. ]])
然后我们可以对每一行求和,得到当前列的A
(当它被赋值给A[:,j]
时,将广播为一列):
(C[j] * mask).sum(axis=-1):
array([ 0.74513377, 1.84148744])
完全向量化版本
最后,我们可以将同样的原则应用于第三个循环 j
的维度,从而打破最后一个循环:
i = np.arange(M)[:,np.newaxis,np.newaxis]
mask = (B == i).astype(int)
A = (C * mask).sum(axis=-1)
@DSM的向量化版本
正如@DSM所建议的那样,您还可以执行以下操作:
A = (B == np.arange(M)[:,np.newaxis]).dot(C.T)
这是目前来说适用于大多数
M
和
N
尺寸的最快解决方案,并且可以说是最优雅的(无论如何都比我的解决方案更优雅)。
让我们稍微分解一下。
B == np.arange(M)[:,np.newaxis]
与上面“矢量化最外层循环”部分中的
B == i
完全等价。
关键在于要认识到所有的
j
和
k
循环都等同于矩阵乘法。dot将布尔型数组
B == i
强制转换为与C相同的数据类型,所以我们不需要担心明确地将它强制转换为不同类型。
之后,我们只需对
C
的转置(一个5x5的数组)和上面的“掩码”0和1数组执行矩阵乘法,得到一个2x5的数组。
dot将利用您安装的任何优化BLAS库(例如
ATLAS
、
MKL
),因此速度非常快。
时间
对于小的
M
和
N
,差异不太明显(循环和DSM版本之间大约6倍):
M, N = 2, 5
%timeit loops(B,C,M)
10000 loops, best of 3: 83 us per loop
%timeit k_vectorized(B,C,M)
10000 loops, best of 3: 106 us per loop
%timeit vectorized(B,C,M)
10000 loops, best of 3: 23.7 us per loop
%timeit askewchan(B,C,M)
10000 loops, best of 3: 42.7 us per loop
%timeit einsum(B,C,M)
100000 loops, best of 3: 15.2 us per loop
%timeit dsm(B,C,M)
100000 loops, best of 3: 13.9 us per loop
然而,一旦
M
和
N
开始增长,它们之间的差异就变得非常显著(大约是原来的600倍)(注意单位!):
M, N = 50, 20
%timeit loops(B,C,M)
10 loops, best of 3: 50.3 ms per loop
%timeit k_vectorized(B,C,M)
100 loops, best of 3: 10.5 ms per loop
%timeit ik_vectorized(B,C,M)
1000 loops, best of 3: 963 us per loop
%timeit vectorized(B,C,M)
1000 loops, best of 3: 247 us per loop
%timeit askewchan(B,C,M)
1000 loops, best of 3: 493 us per loop
%timeit einsum(B,C,M)
10000 loops, best of 3: 134 us per loop
%timeit dsm(B,C,M)
10000 loops, best of 3: 80.2 us per loop
C[k] == i
应该是B[k] == i
吗? - DSM