使用NumPy高效计算给定向量元素的所有成对乘积

3

我正在寻找计算给定向量元素的所有成对乘积的“最优”方法。 如果向量大小为N ,则输出将是大小为N *(N + 1)// 2 的向量,并包含所有(i,j)对的x [i] * x [j] 值,其中 i <= j 。 计算此过程的朴素方式如下:

import numpy as np

def get_pairwise_products_naive(vec: np.ndarray):
    k, size = 0, vec.size
    output = np.empty(size * (size + 1) // 2)
    for i in range(size):
        for j in range(i, size):
            output[k] = vec[i] * vec[j]
            k += 1
    return output

要求:

  • 尽量减少内存的额外分配/使用:如果可能的话,请直接写入输出缓冲区。
  • 使用向量化的NumPy例程,而不是显式循环。
  • 避免额外(不必要)的计算。

我一直在尝试使用诸如outertriu_indiceseinsum等例程以及一些索引/视图技巧,但还没有找到符合上述要求的解决方案。


如果可能的话,请直接写入到输出缓冲区。您是指实际上已经分配了一个输出数组,还是可以多次重复使用它?在@orlp的Numba版本中,分配输出数组实际上是最耗时的部分,而不是计算本身。(仅计算12.8毫秒,计算+分配43.7毫秒)在我的系统上。 - max9111
我的意思是我想避免为任何临时数组进行不必要的分配。这意味着唯一的分配将是为输出缓冲区,它将填充计算结果。 - iheap
对不起,我一开始没有看到并行化的可能性。有一些潜力...我会添加一个并行版本。 - max9111
@iheap,这两个解决方案中有一个对您有用吗? - Divakar
我目前正在使用Numba解决方案。如果没有人提出聪明的仅基于NumPy的解决方案,我会保持问题开放一段时间。如果没有,我将在几天内接受Numba解决方案。 - iheap
https://dev59.com/brvoa4cB1Zd3GeqPzTad#62015703/ 中的 NumPy-only 解决方案怎么样?不够聪明吗? :) @iheap - Divakar
3个回答

4

我会先计算M = vTv,然后将此矩阵的下三角或上三角部分压平。

def pairwise_products(v: np.ndarray):
    assert len(v.shape) == 1
    n = v.shape[0]
    m = v.reshape(n, 1) @ v.reshape(1, n)
    return m[np.tril_indices_from(m)].ravel()

我还想提到numba,它能让你的“朴素”方法比这个更快。
import numba

@numba.njit
def pairwise_products_numba(vec: np.ndarray):
    k, size = 0, vec.size
    output = np.empty(size * (size + 1) // 2)
    for i in range(size):
        for j in range(i, size):
            output[k] = vec[i] * vec[j]
            k += 1
    return output

仅对上述pairwise_products(np.arange(5000))进行测试需要约0.3秒,而Numba版本需要约0.05秒(忽略第一次运行,它用于即时编译该函数)。


这种方法创建了两个临时数组(mtril_indices_from的返回值),对吧?我正在尝试理解是否可能省略复制。 - iheap
@iheap 是的。如果你真的想要在这里挤出一点性能,可以使用 numba - numpy(除非我错过了一个非常好的函数)在这里无法帮助你进一步。 - orlp
确实,numba 有很大的作用,可以自动地将朴素版本转换为最优版本。我会等一段时间看看是否有人能想出只使用基本 NumPy 函数的聪明方法;如果没有,我稍后会接受你的答案。 - iheap
@orlp 你可以尝试使用.ravel()而不是.flatten()来使NumPy版本更快。 - norok2
@orlp 你可以使用-1语法(arr.reshape(-1, 1)),让numpy自己推断出缺失的形状n - Quan Hoang

4

方法 #1

使用NumPy提供的向量化技术,你可以在进行外积(outer-multiplication)操作并获得所有成对乘积后,再通过遮盖(masking)方式实现向量化处理,代码如下 -

def pairwise_multiply_masking(a):
    return (a[:,None]*a)[~np.tri(len(a),k=-1,dtype=bool)]

方法二

对于非常大的输入1D数组,我们可以采用迭代slicing方法,这种方法只使用一个循环 -

def pairwise_multiply_iterative_slicing(a):
    n = len(a)
    N = (n*(n+1))//2
    out = np.empty(N, dtype=a.dtype)
    c = np.r_[0,np.arange(n,0,-1)].cumsum()
    for ii,(i,j) in enumerate(zip(c[:-1],c[1:])):
        out[i:j] = a[ii:]*a[ii]
    return out

基准测试

我们将在设置中包含 @orlp 的解决方案中的 pairwise_productspairwise_products_numba

使用benchit软件包(几个基准测试工具打包在一起;免责声明:我是其作者)来对所提出的解决方案进行基准测试。

import benchit
funcs = [pairwise_multiply_masking, pairwise_multiply_iterative_slicing, pairwise_products_numba, pairwise_products]
in_ = [np.random.rand(n) for n in [10,50,100,200,500,1000,5000]]
t = benchit.timings(funcs, in_)
t.plot(logx=True, save='timings.png')
t.speedups(-1).plot(logx=True, logy=False, save='speedups.png')

结果(相对于pairwise_products的时间和加速比) -

enter image description here

enter image description here

从图表趋势可以看出,对于非常大的数组,基于切片的方法将开始胜出,否则矢量化的方法会做得很好。

建议

  • 我们还可以查看 numexpr 以更有效地执行大数组的外部乘法。

为什么你的基准测试中没有包含 Numba 版本?我很想看到确切的数字。 - orlp
@orlp 因为看起来OP正在寻找基于NumPy的解决方案,所以我还是加上了。 - Divakar
很好,我点了个赞。我很惊讶看到pairwise_multiply_iterative_slicing开始领先。我怀疑这是因为output[k] = vec[i] * vec[j]。如果我们将vec[i]从内部循环中取出并编写m = vec[i]output[k] = m * vec[j],那么numba是否总是最快的呢? - orlp
似乎没有什么区别。但在我的机器上,numba总是最快的。 - orlp
@orlp 很棒!我猜对于你的系统来说,如果你有更多的RAM来看迭代的那个numba one被切断,你可以将其延伸到10K甚至更多?很高兴看到Benchit在其他地方也被使用 :) 对你也是一样。 - Divakar

0
你也可以并行化这个算法。如果能够一次性分配足够大的数组(对该数组的较小视图几乎不会有任何成本),然后在之后进行覆盖,就可以实现更大的加速。 示例
@numba.njit(parallel=True)
def pairwise_products_numba_2_with_allocation(vec):
    k, size = 0, vec.size
    k_vec=np.empty(vec.size,dtype=np.int64)
    output = np.empty(size * (size + 1) // 2)

    #precalculate the indices
    for i in range(size):
        k_vec[i] = k
        k+=(size-i)

    for i in numba.prange(size):
        k=k_vec[i]
        for j in range(size-i):
            output[k+j] = vec[i] * vec[j+i]

    return output

@numba.njit(parallel=True)
def pairwise_products_numba_2_without_allocation(vec,output):
    k, size = 0, vec.size
    k_vec=np.empty(vec.size,dtype=np.int64)

    #precalculate the indices
    for i in range(size):
        k_vec[i] = k
        k+=(size-i)

    for i in numba.prange(size):
        k=k_vec[i]
        for j in range(size-i):
            output[k+j] = vec[i] * vec[j+i]

    return output

时间

A=np.arange(5000)
k, size = 0, A.size
output = np.empty(size * (size + 1) // 2)

%timeit res_1=pairwise_products_numba_2_without_allocation(A,output)
#7.84 ms ± 116 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res_2=pairwise_products_numba_2_with_allocation(A)
#16.9 ms ± 325 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit res_3=pairwise_products_numba(A) #@orlp
#43.3 ms ± 134 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接