有没有更快的方法来添加两个二维NumPy数组?

6

假设我有两个维度相同的大型二维numpy数组(比如2000x2000)。我想对它们进行逐元素求和。我想知道是否有比np.add()更快的方法。

编辑:我添加了一个类似于我现在使用的示例。有没有办法加速这个过程?

#a and b are the two matrices I already have.Dimension is 2000x2000
#shift is also a list that is previously known
for j in range(100000):
    b=np.roll(b, shift[j] , axis=0)
    a=np.add(a,b)

将2000x2000数组相加仅涉及400万个元素,从硬件方面看,加法非常直观。我不认为通过优化此操作可以获得显着的加速。您是否担心代码的性能?这是在关键循环的中间吗? - Jonathan Wheeler
@JonathanWheeler 实际上你是正确的。它在一个循环内部,而且每秒大约会执行200次加法操作。这个加法操作单独消耗了超过总运行时间的50%。有什么建议吗? - Anirban Dutta
1
一个可能的加速方法是在计算数字时将它们相加。您是否考虑过使用ThreadingSubprocess库?(这些可能不是它们的名称) - Jonathan Wheeler
5
减少复制数组可能有助于缩短时间。例如,x += y 的速度比 z = x+y 快。 - user4322543
1
加法操作并不需要太多的优化,如果我们了解程序的更多背景信息,我们可以提供一些替代方案(例如线程),但是就目前所提出的问题而言,我真的认为没有什么需要说的了。 - Tadhg McDonald-Jensen
显示剩余2条评论
1个回答

7

方法一(向量化)

我们可以使用模运算来模拟roll/circshift的循环行为,并使用广播索引来覆盖所有行,这样就可以实现完全向量化的方法,如下所示 -

n = b.shape[0]
idx = n-1 - np.mod(shift.cumsum()[:,None]-1 - np.arange(n), n)
a += b[idx].sum(0)

方法二(循环)

b_ext = np.row_stack((b, b[:-1] ))
start_idx = n-1 - np.mod(shift.cumsum()-1,n)
for j in range(start_idx.size):
    a += b_ext[start_idx[j]:start_idx[j]+n]

冒号表示法与使用索引进行切片

这里的想法是在进入循环后尽量减少工作量。我们在进入循环之前预先计算每次迭代的起始行索引。因此,一旦进入循环,我们所需要做的就是使用冒号表示法进行切片,这是数组的一种视图并进行加和运算。这应该比需要计算所有行索引的rolling操作更好,因为后者需要进行昂贵的复制。

下面是关于使用冒号和索引进行切片时的视图和复制概念的详细说明:

In [11]: a = np.random.randint(0,9,(10))

In [12]: a
Out[12]: array([8, 0, 1, 7, 5, 0, 6, 1, 7, 0])

In [13]: a[3:8]
Out[13]: array([7, 5, 0, 6, 1])

In [14]: a[[3,4,5,6,7]]
Out[14]: array([7, 5, 0, 6, 1])

In [15]: np.may_share_memory(a, a[3:8])
Out[15]: True

In [16]: np.may_share_memory(a, a[[3,4,5,6,7]])
Out[16]: False

运行时测试

函数定义 -

def original_loopy_app(a,b):
    for j in range(shift.size):
        b=np.roll(b, shift[j] , axis=0)
        a += b

def vectorized_app(a,b):
    n = b.shape[0]
    idx = n-1 - np.mod(shift.cumsum()[:,None]-1 - np.arange(n), n)
    a += b[idx].sum(0)

def modified_loopy_app(a,b):
    n = b.shape[0]
    b_ext = np.row_stack((b, b[:-1] ))
    start_idx = n-1 - np.mod(shift.cumsum()-1,n)
    for j in range(start_idx.size):
        a += b_ext[start_idx[j]:start_idx[j]+n]

案例#1:

In [5]: # Setup input arrays
   ...: N = 200
   ...: M = 1000
   ...: a = np.random.randint(11,99,(N,N))
   ...: b = np.random.randint(11,99,(N,N))
   ...: shift = np.random.randint(0,N,M)
   ...: 

In [6]: original_loopy_app(a1,b1)
   ...: vectorized_app(a2,b2)
   ...: modified_loopy_app(a3,b3)
   ...: 

In [7]: np.allclose(a1, a2) # Verify results
Out[7]: True

In [8]: np.allclose(a1, a3) # Verify results
Out[8]: True

In [9]: %timeit original_loopy_app(a1,b1)
   ...: %timeit vectorized_app(a2,b2)
   ...: %timeit modified_loopy_app(a3,b3)
   ...: 
10 loops, best of 3: 107 ms per loop
10 loops, best of 3: 137 ms per loop
10 loops, best of 3: 48.2 ms per loop

案例 #2:

In [13]: # Setup input arrays (datasets are exactly 1/10th of original sizes)
    ...: N = 200
    ...: M = 10000
    ...: a = np.random.randint(11,99,(N,N))
    ...: b = np.random.randint(11,99,(N,N))
    ...: shift = np.random.randint(0,N,M)
    ...: 

In [14]: %timeit original_loopy_app(a1,b1)
    ...: %timeit modified_loopy_app(a3,b3)
    ...: 
1 loops, best of 3: 1.11 s per loop
1 loops, best of 3: 481 ms per loop

所以,我们使用修改后的循环方法,可以获得2x+的加速!

谢谢。它正在工作。我也喜欢你处理整个事情的方式。我的方法太传统了。 - Anirban Dutta

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接