在索引上求2D数组值的总和

7
我需要扩展这个问题,它基于第二个数组的索引对一个数组中的值进行求和。假设A是结果数组,B是索引数组,C是要进行求和的数组。则A[i] = sum对于C的求和方式为index(B) == i
然而,我的设置是:
N = 5
M = 2

A = np.zeros((M,N))
B = np.random.randint(M, size=N) # contains indices for A
C = np.random.rand(N,N)

我需要 `A[i,j] = sum_{k in 0...N} C[j,k]`,其中 `C[k] == i`,即对 B 的索引与 i 相匹配的行求和。有没有一种有效的方法来完成这个操作?对于我的应用程序,N 大约是 10,000,M 大约是 20。在最小化问题的每次迭代中调用此操作...我的当前循环方法非常慢。
谢谢!

3
我不完全确定我理解B的角色。您能编辑一下并包含等效的循环吗? - DSM
2
哦,等等。你的 C[k] == i 应该是 B[k] == i 吗? - DSM
糟糕!抱歉,应该是B[k]而不是C[k]! - Kevin
2个回答

4

根据@DSM的评论,我认为你的应该改为。如果是这样的话,你的循环版本是否像下面这样?

嵌套循环版本

import numpy as np

N = 5
M = 2

A = np.zeros((M,N))
B = np.random.randint(M, size=N) # contains indices for A
C = np.random.rand(N,N)

for i in range(M):
    for j in range(N):
        for k in range(N):
            if B[k] == i:
                A[i,j] += C[j,k]

有多种方法可以将这个问题向量化。我将展示我的思考过程,但还有更有效的方法(例如@DSM的版本,在问题中识别出矩阵乘法)。

为了解释起见,以下是一种方法的演示。


向量化内部循环

让我们从重写内部k循环开始:

for i in range(M):
    for j in range(N):
        A[i,j] = C[j, B == i].sum()

这句话可以更简单地表述为C[j][B == i].sum()。我们只需要选择C的第j行,在该行中选择仅当B等于i的元素,并将它们相加。


向量化最外层循环

接下来,让我们分解外部的i循环。现在我们将会到达一个可读性会受到影响的地方,不幸地...

i = np.arange(M)[:,np.newaxis]
mask = (B == i).astype(int)
for j in range(N):
    A[:,j] = (C[j] * mask).sum(axis=-1)

这里有几种技巧。在这种情况下,我们正在遍历 A 的列。每个 A 的列是对应行的子集的总和 CB 等于行索引 i 的位置,决定了 C 行的子集。

为了避免遍历 i,我们通过向 i 添加一个新轴来创建一个 2D 数组,使得 B == i。(如果您对此感到困惑,请查看numpy广播的文档。)换句话说:

B:
    array([1, 1, 1, 1, 0])

i: 
    array([[0],
           [1]])

B == i:
    array([[False, False, False, False,  True],
           [ True,  True,  True,  True, False]], dtype=bool)

我们想要做的是对C[j]进行两个(M)筛选和求和,一个用于B == i中的每一行。这将给我们一个由两个元素组成的向量,对应于A中的第j列。
我们不能直接通过索引C来实现这一点,因为结果的形状将不会保持不变,因为每一行可能具有不同数量的元素。我们将通过将B == i掩码乘以当前的C行来解决这个问题,从而在B == iFalse的位置得到零,并在其为真时得到C当前行中的值。
为了做到这一点,我们需要将布尔数组B == i转换为整数:
mask = (B == i).astype(int):
    array([[0, 0, 0, 0, 1],
           [1, 1, 1, 1, 0]])

所以当我们将其与当前行的C相乘时:
C[j]:
    array([ 0.19844887,  0.44858679,  0.35370919,  0.84074259,  0.74513377])

C[j] * mask:
    array([[ 0.        ,  0.        ,  0.        ,  0.        ,  0.74513377],
           [ 0.19844887,  0.44858679,  0.35370919,  0.84074259,  0.        ]])

然后我们可以对每一行求和,得到当前列的A(当它被赋值给A[:,j]时,将广播为一列):

(C[j] * mask).sum(axis=-1):
    array([ 0.74513377,  1.84148744])

完全向量化版本

最后,我们可以将同样的原则应用于第三个循环 j 的维度,从而打破最后一个循环:

i = np.arange(M)[:,np.newaxis,np.newaxis]
mask = (B == i).astype(int)
A = (C * mask).sum(axis=-1)

@DSM的向量化版本

正如@DSM所建议的那样,您还可以执行以下操作:

A = (B == np.arange(M)[:,np.newaxis]).dot(C.T)

这是目前来说适用于大多数MN尺寸的最快解决方案,并且可以说是最优雅的(无论如何都比我的解决方案更优雅)。
让我们稍微分解一下。 B == np.arange(M)[:,np.newaxis]与上面“矢量化最外层循环”部分中的B == i完全等价。
关键在于要认识到所有的jk循环都等同于矩阵乘法。dot将布尔型数组B == i强制转换为与C相同的数据类型,所以我们不需要担心明确地将它强制转换为不同类型。
之后,我们只需对C的转置(一个5x5的数组)和上面的“掩码”0和1数组执行矩阵乘法,得到一个2x5的数组。
dot将利用您安装的任何优化BLAS库(例如ATLASMKL),因此速度非常快。

时间

对于小的MN,差异不太明显(循环和DSM版本之间大约6倍):
M, N = 2, 5

%timeit loops(B,C,M)
10000 loops, best of 3: 83 us per loop

%timeit k_vectorized(B,C,M)
10000 loops, best of 3: 106 us per loop

%timeit vectorized(B,C,M)
10000 loops, best of 3: 23.7 us per loop

%timeit askewchan(B,C,M)
10000 loops, best of 3: 42.7 us per loop

%timeit einsum(B,C,M)
100000 loops, best of 3: 15.2 us per loop

%timeit dsm(B,C,M)
100000 loops, best of 3: 13.9 us per loop

然而,一旦 MN 开始增长,它们之间的差异就变得非常显著(大约是原来的600倍)(注意单位!):
M, N = 50, 20

%timeit loops(B,C,M)
10 loops, best of 3: 50.3 ms per loop

%timeit k_vectorized(B,C,M)
100 loops, best of 3: 10.5 ms per loop

%timeit ik_vectorized(B,C,M)
1000 loops, best of 3: 963 us per loop

%timeit vectorized(B,C,M)
1000 loops, best of 3: 247 us per loop

%timeit askewchan(B,C,M)
1000 loops, best of 3: 493 us per loop

%timeit einsum(B,C,M)
10000 loops, best of 3: 134 us per loop

%timeit dsm(B,C,M)
10000 loops, best of 3: 80.2 us per loop

@DSM - 我可以验证它会给出相同的结果,但你是完全正确的。它不应该是等价的。我需要再考虑一下。 - Joe Kington
1
好吧,我是个白痴...不知怎么地,我在你的代码中删除了 axis=-1 并将其替换为 axis=1,所以无法重现任何东西也就不足为奇了。你的版本确实与我的 (B == np.arange(M)[:,None]).dot(C.T)np.einsum('ij,kj->ik', B == np.arange(M)[:,None], C) 匹配,而且我今天可能应该停止思考了... - DSM
@DSM - 不管怎样,我的原始帖子也犯了这个错误(1与-1)。我在最初的几分钟内进行了编辑,因此它不会显示为编辑,但是您的 axis=1 可能实际上来自于复制粘贴我的初始答案。 :) (感谢您提供的 einsum 提示!) - Joe Kington
@DSM - 你应该把那些发布为答案。识别固有的矩阵乘法可以使问题变得非常优雅(且更快!) - Joe Kington
@JoeKington:那确实是个办法,但我有点懒。:^) 另外,我认为教学性的答案对于提问者和未来的搜索者都更有用,而你的回答已经很好地涵盖了这个问题。 - DSM
1
йқһеёёеҘҪпјҢ@JoeгҖӮдҪ зҡ„и®Ўж—¶еә”иҜҘдҪҝз”ЁN >> MпјҢеӣ дёәOPзҡ„жғ…еҶөжҳҜN ~ 10,000е’ҢM ~ 20гҖӮ - askewchan

3
我假设@DSM发现了你的错别字,你想要:
A[i,j] = sum_{k in 0...N} C[j,k] where B[k] == i

然后您可以循环遍历i in range(M),因为M相对较小。

A = np.array([C[:,B == i].sum(axis=1) for i in range(M)])

2
比我的更直观和易读,+1。顺便提一下,在类似于numpy、matlab等的东西中,“向量化大部分内容并循环最短的轴”这种通用方法通常是速度和内存使用之间的良好折衷。 - Joe Kington

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接