使用NumPy数组高效交换元素

6
假设我们有一个大矩阵 A,以及我们想要交换的两个矩阵元素的索引 (c1, r1)(c2, r2):
import numpy as np
A = np.random.rand(1000,1000)
c1, r1 = 10, 10
c2, r2 = 20, 40

实现这一功能的Pythonic方式是:

A[c1, r1], A[c2, r2] = A[c2, r2], A[c1, r1]

然而,如果您想要进行大量交换,这种方法可能会很慢。
有没有更有效的方法来交换numpy数组中的两个元素?
提前致谢。

你需要按特定顺序执行大量操作吗?(例如,交换p1/p2然后p2/p3,如果以相反的顺序应用可能会产生不同的结果) - shx2
交换中的元素是完全随机的。有时您可以以任何特定顺序应用交换并获得相同的结果,有时则不行。 - lackadaisical
2
如果您提供一个可运行的测试用例,您将得到更好的答案。它可以是迭代的。清晰地定义问题是开发更高效解决方案的第一步。 - hpaulj
2个回答

3

初步答案,不可行

您可以轻松地将交换操作向量化,通过使用索引数组(c1,r1,c2,r2)而不是迭代标量索引列表。

c1 = np.array(<all the "c1" values>, dtype=int)
r1 = np.array(<all the "r1" values>, dtype=int)
c2 = np.array(<all the "c2" values>, dtype=int)
r2 = np.array(<all the "r2" values>, dtype=int)
A[c1, r1], A[c2, r2] = A[c2, r2], A[c1, r1]

请注意,这种方法会一次性完成所有交换操作,与迭代方式不同,如果交换顺序有所不同,则结果也会不同。因此,这不是您问题的有效答案
例如,将p1与p2交换,然后将p2与p3交换,与一次性交换p1和p2以及p2和p3是不同的。在后一种情况下,p1和p3都获得了p2的原始值,而p2获取了p1和p3之间值的最后一个,即p3(根据它们在索引数组中出现的顺序)。
但是,由于您需要速度,将操作向量化(以某种方式)必须是正确的方法。

将正确性添加到上述解决方案

那么我们如何执行向量化交换,同时获得所需的输出呢?我们可以采用混合方法,将索引列表分成块(尽可能少的块),其中每个块仅包含唯一点,从而保证顺序无关紧要。对每个块进行向量化交换,并且我们只迭代块。
以下是如何实现的草图:
c1, r1 = np.array([ np.arange(10), np.arange(10) ])
c2, r2 = np.array([ [2,4,6,8,0,1,3,5,7,9], [9,1,6,8,2,2,2,2,7,0] ])
A = np.empty((15,15))

def get_chunk_boundry(c1, r1, c2, r2):
    a1 = c1 + 1j * r1
    a2 = c2 + 1j * r2
    set1 = set()
    set2 = set()
    for i, (x1, x2) in enumerate(zip(a1, a2)):
        if x1 in set2 or x2 in set1:
            return i
        set1.add(x1); set2.add(x2)
    return len(c1)

while len(c1) > 0:
    i = get_chunk_boundry(c1, r1, c2, r2)
    c1b = c1[:i]; r1b = r1[:i]; c2b = c2[:i]; r2b = r2[:i]
    print 'swapping %d elements' % i
    A[c1b,r1b], A[c2b,r2b] = A[c2b,r2b], A[c1b,r1b]
    c1 = c1[i:]; r1 = r1[i:]; c2 = c2[i:]; r2 = r2[i:]

如果一开始将索引存储为二维数组(N x 4),则在此处进行切片会更快。


1
这是一个迭代解决方案,我为参考目的编写了它(作为处理可能重复项的一种方法):
def iter2(A, rc1, rc2):
    for r,c in zip(rc1.T, rc2.T):
        j,k = tuple(r),tuple(c)
        A[j],A[k] = A[k],A[j]
    return A

For example, if:

N = 4
Aref=np.arange(N)+np.arange(N)[:,None]*10

rc1=np.array([[0,0,0],[0,3,0]])
rc2=np.array([[3,3,2],[3,0,2]])

那么

 print(iter2(A.copy(), rc1,rc2))

产生角落的交换,然后与 (2,2) 交换:

[[22  1  2 30]
 [10 11 12 13]
 [20 21 33 23]
 [ 3 31 32  0]]
< p > shx2的解决方案似乎做了同样的事情,但是对于我的测试用例,在分块函数中似乎存在错误。

对于shx2的测试(15,15)数组,我的迭代解决方案快4倍!它进行更多的交换,但每次交换的工作量较少。对于较大的数组,我希望分块会更快,但我不知道我们需要扩展到多大的规模。这也将取决于重复模式。

使用我的数组进行愚蠢的矢量化交换:

A[tuple(rc1)],A[tuple(rc2)] = A[tuple(rc2)],A[tuple(rc1)]

在这个(15,15)的例子中,它只比我的迭代解决方案快20%。显然,我们需要一个更大的测试用例来产生一些严肃的计时。
迭代解决方案在操作1维数组时更快、更简单。即使进行了raveling和reshaping,这个函数仍然是我发现的最快的。在Cython上,我没有得到太大的速度提升。(但是对于数组大小的警告仍然适用。)
def adapt_1d(A, rc1, rc2, fn=None):
    # adapt a multidim case to use a 1d iterator
    rc2f = np.ravel_multi_index(rc2, A.shape)
    rc1f = np.ravel_multi_index(rc1, A.shape)
    Af = A.flatten()
    if fn is None:
        for r,c in zip(rc1f,rc2f):
            Af[r],Af[c] = Af[c],Af[r]
    else:
        fn(Af, rc1f, rc2f)
    return Af.reshape(A.shape)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接