有没有方法可以使这个NumPy数组操作更快?

4
假设我有一个矩阵:
A = [[2, 1]
     [1, 2]]

还有一个矩阵列表:

B = [[1, 0]   C = [[2, 1],  D = [[0, 0],  E = [[1, 0],
     [1, 0]]       [0, 0]]       [0, 0]]       [0, 0]]

首先,我希望将A.flatten() = [2 1 1 2]展平。然后,对这些元素分别乘以BCDE,并将它们的和求出来。因此:

A[0] * B + A[1]*C + A[2]*D + A[3]*E

现在考虑一个更一般化的情况:
A[0] * X_1 + A[1] * X_2 + ... + A[n-1] * X_n

在这里,X_n可以有任何维度。这是我想到的代码:

import numpy as np
from functools import reduce
from operator import mul

def product(iterable):
    return reduce(mul, iterable)


def create_table(old_shape, new_shape):
    # Create X_1, X_2, ..., X_n
    lookup = []
    for _ in range(product(old_shape)):
        lookup.append(np.random.rand(*new_shape))
    return lookup


def sum_expansion(arr, lookup, shape):
    # A[0] * X_1 + ... + A[n-1] * X_n
    new_arr = np.zeros(shape)
    for i, a in enumerate(arr.flatten()):
        new_arr += a * lookup[i]

    return new_arr

if __name__ == '__main__':
    lookup = create_table((2, 2), (3, 3, 3))
    # Generate random 2 x 2 matrices.
    randos = (np.random.rand(2, 2) for _ in range(100000))
    results = map(lambda x: sum_expansion(x, lookup, (3, 3, 3)), randos)
    print(list(results))

在我的机器上执行此代码大约需要74秒。有没有办法缩短代码执行时间?


1
我怀疑那74秒中的大部分时间都花在了打印结果上。 - Paul Panzer
啊,天哪,哈哈。我想你是对的(还要再调查一下以确保)。回到绘图板上,我的另一个程序中有一个瓶颈,我本以为已经将其隔离并制作了一个最小可行性实验。谢谢! - Dair
@hpaulj:我需要平均每次运行100000次,我希望让它更快,这样每次运行就不会那么慢。 - Dair
你尝试过已发布的解决方案吗? - Divakar
@Divakar:实际程序(不是这个MVE)似乎没有多少改进。我正在尝试结合你的解决方案和waterboy的解决方案进行适应。在深入研究之前,我需要完成一些任务。这证明比我想象的要复杂一些。 - Dair
显示剩余2条评论
3个回答

2
In [20]: randos = [np.random.rand(2, 2) for _ in range(10)]

In [21]: timeit [sum_expansion(x,lookup,(3,3,3)) for x in randos]                                                       10000 loops, best of 3: 184 µs per loop  

就目前来看,这个时间还不错。每次调用sum_expansion需要18微秒。

In [22]: timeit create_table((2,2),(3,3,3))                                                                             
100000 loops, best of 3: 14.1 µs per loop      

理解你正在做的事情需要更多时间。我看到了很多Python迭代,但很少有numpy编码。


使用einsum进行乘法和求和可以获得3倍的提高:

def ein_expansion(arr, lookup, shape):                                                                                      
    return np.einsum('ij,ij...',arr, lookup) 

In [45]: L = np.array(lookup).reshape(2,2,3,3,3)

In [43]: timeit [ein_expansion(r, L,(3,3,3)) for r in randos]                                                           
10000 loops, best of 3: 58.3 µs per loop  

我们可以通过同时操作多个randos数组来获得进一步的改进。

 In [59]: timeit np.einsum('oij,ij...->o...',np.array(randos),L)                                                         
 100000 loops, best of 3: 15.8 µs per loop   

 In [60]: np.einsum('oij,ij...->o...',np.array(randos),L).shape                                                           
 Out[60]: (10, 3, 3, 3)  

谢谢。我会进一步研究这个。我对einsum不熟悉,需要一点时间来理解。 - Dair
(randos[0][:,:,None,None,None]*L).sum((0,1)) 做的是同样的事情,但速度不够快。 - hpaulj

2
这可以通过在正确重塑的数组上使用爱因斯坦求和比较简单地实现:
import numpy as np


def do_sum(x, mat_lst):
    a = np.array(x).flatten().reshape(1, -1)
    print('A shape: ', a.shape)
    b = np.stack(mat_lst)
    print('B shape: ', b.shape)
    return np.einsum('ij,jkl->kl', a, b)

A = [[1,2],[3,4]]
B = [[[1,1],[1,1]],[[2,2],[2,2]],[[3,3],[3,3]],[[4,4],[4,4]]]

do_sum(A,B)

输出

A shape:  (1, 4)
B shape:  (4, 2, 2)

[[30 30]
 [30 30]]

编辑 - 通用情况

这适用于 n 维输入数组的列表。唯一的先决条件是 x 中的元素数量应该等于 mat_lst 的长度。

def do_sum(x, mat_lst):
    a = np.array(x).flatten()
    b = np.stack(mat_lst)
    print("A shape: {}\nB shape: {}".format(a.shape, b.shape))
    return np.einsum('i,i...', a, b)

A = [[1,2],[3,4]]
B = [np.random.rand(2,2,2) for _ in range(4)]
do_sum(A,B)

(注:之前我改变扁平化数组的形状并没有必要,只是为了更好地理解爱因斯坦求和(在我看来,一个(1x3)矩阵比一个(3,)矩阵更容易想象)。所以,我已经将其移除。)
爱因斯坦约定意味着对于每个操作数重复的指数求和。在我们的广义情况下,两个矩阵的形状分别为a.shape = (n,)b.shape = (n,...),我们希望仅对ab的第一维求和。我们不关心b中其他维度的深度或数量,因此我们使用...作为其余维度的通配符。求和维度从输出数组中消失,因此输出数组包含所有其他维度(即...)。
传递给einsum的下标字符串捕获了所有这些信息。在字符串的输入侧(->左侧的所有内容),我们用逗号分隔每个操作数(即输入矩阵ab)的索引。需要求和的指数被重复(即i)。在字符串的输出侧(->右侧),我们指示输出索引。我们的函数不需要输出字符串,因为我们想输出所有未包含在求和中的维度(我想是这样的)。

这适用于更高维度的矩阵吗? - Dair
是的 - 请参见编辑部分以获取一般情况和一些额外的评论。 - Crispin

1
对于多维数组的求和缩减,我认为我们可以建议在将 randos 重塑以将最后两个轴合并为一个后使用 np.tensordot
np.tensordot(np.array(randos).reshape(-1,4),lookup, axes=((-1),(0)))

这里有另一个与使用np.tensordot不同的方法,通过重新塑形第二个数组来实现 -

lookup_arr = np.asarray(lookup).reshape(2,2,3,3,3)
out = np.tensordot(randos,lookup_arr,axes=((-2,-1),(0,1)))

运行时测试 -

In [69]: randos = [np.random.rand(2, 2) for _ in range(100)]

In [73]: lookup = create_table((2, 2), (3, 3, 3))

In [74]: lookup_arr = np.asarray(lookup).reshape(2,2,3,3,3)

In [75]: out1 = np.tensordot(np.array(randos).reshape(-1,4),lookup, axes=((-1),(0)))
    ...: out2 = np.tensordot(randos,lookup_arr,axes=((-2,-1),(0,1)))
    ...: 

In [76]: np.allclose(out1, out2)
Out[76]: True

In [77]: %timeit np.tensordot(np.array(randos).reshape(-1,4),\
                                      lookup, axes=((-1),(0)))
10000 loops, best of 3: 37 µs per loop

In [78]: %timeit np.tensordot(randos,lookup_arr,axes=((-2,-1),(0,1)))
10000 loops, best of 3: 33.3 µs per loop

In [79]: %timeit np.asarray(lookup).reshape(2,2,3,3,3)
100000 loops, best of 3: 2.18 µs per loop

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接