使用scipy稀疏矩阵与一个3D的numpy数组相乘。

5

我有以下矩阵

a = sp.random(150, 150)
x = np.random.normal(0, 1, size=(150, 20))

我基本上想要实现以下公式

我可以像这样计算内部差异。
diff = (x[:, None, :] - x[None, :, :]) ** 2
diff.shape  # -> (150, 150, 20)

a.shape  # -> (150, 150)

我基本上想要广播scipy稀疏矩阵和每个内部numpy数组之间的逐元素乘法。

如果 A 允许是密集的,那么我可以简单地执行如下操作

np.einsum("ij,ijk->k", a.toarray(), (x[:, None, :] - x[None, :, :]) ** 2)

但是A是稀疏的,而且可能非常大,所以这不是一个选项。当然,我可以重新排列轴并使用for循环遍历diff数组,但是否有一种使用numpy更快的方法呢?

正如@hpaulj所指出的那样,当前的解决方案还形成了一个形状为(150, 150, 20)的数组,这也会立即导致内存问题,因此这个解决方案也不可行。


1
diffa(形状)大20倍,且密集。 - hpaulj
是的,那是真的,这也是一个巨大的问题。我会考虑到这个问题并更新问题。感谢你指出这一点! - Pavlin
即使 einsum 能够正常工作,也不会改善内存使用情况。您仍然需要计算三维 diff。我认为 np.einsum('ij,ijk->k, a.A, diff)np.sum(a.A[:,:,None]*diff), axis=(0,1)) 将具有类似的内存使用情况,相当于在临时缓冲区中复制三维 diff 一到两次。(注意,我通常不进行精细的内存使用跟踪,并且不想用潜在的内存错误来挂起我的机器。) - hpaulj
我之前不知道这一点。我很少使用einsum,这里只是为了演示我想要实现的目标。关于内存,甚至形成一个NxN矩阵也可能有问题。我希望有一个聪明的numpy技巧来计算这个方程,但也许我必须求助于numba或cython。 - Pavlin
1
你实际上想表达的是:\sigma_k = \sum_i^N \sum_j^N A_{i,j} (x_{i,k} - x_{j,k})^2 - Reinderien
显示剩余4条评论
2个回答

2
import numpy as np
import scipy.sparse
from numpy.random import default_rng

rand = default_rng(seed=0)

# \sigma_k = \sum_i^N \sum_j^N A_{i,j} (x_{i,k} - x_{j,k})^2

# Dense method
N = 100
x = rand.integers(0, 10, (N, 2))
A = np.clip(rand.integers(0, 100, (N, N)) - 80, a_min=0, a_max=None)
diff = (x[:, None, :] - x[None, :, :])**2
product = np.einsum("ij,ijk->k", A, diff)

# Loop method
s_loop = [0, 0]
for i in range(N):
    for j in range(N):
        for k in range(2):
            s_loop[k] += A[i, j]*(x[i, k] - x[j, k])**2
assert np.allclose(product, s_loop)

# For any i,j, we trivially know whether A_{i,j} is zero, and highly sparse matrices have more zeros
# than nonzeros. Crucially, do not calculate (x_{i,k} - x_{j,k})^2 at all if A_{i,j} is zero.
A_i_nz, A_j_nz = A.nonzero()
diff = (x[A_i_nz, :] - x[A_j_nz, :])**2
s_semidense = A[A_i_nz, A_j_nz].dot(diff)
assert np.allclose(product, s_semidense)

# You can see where this is going:
A_sparse = scipy.sparse.coo_array(A)
diff = (x[A_sparse.row, :] - x[A_sparse.col, :])**2
s_sparse = A_sparse.data.dot(diff)
assert np.allclose(product, s_sparse)

看起来相当快;这大约需要一秒钟完成:

N = 100_000_000
print(f'Big test: initialising a {N}x{N} array')
n_values = 10_000_000
A = scipy.sparse.coo_array(
    (
        rand.integers(0, 100, n_values),
        rand.integers(0, N, (2, n_values)),
    ),
    shape=(N, N),
)
x = rand.integers(0, 100, (N, 2))

print('Big test: calculating')
s = A.data.dot((x[A.row, :] - x[A.col, :])**2)

print(s)

谢谢!这正是我所需要的,而且完成了我想要的一切。非常聪明! - Pavlin

-2
尝试以下代码:
a = np.random.rand(150, 150)
x = np.random.normal(0, 1, size=(150, 20))
diff = (x[:, None, :] - x[None, :, :]) ** 2
diff.shape  # -> (150, 150, 20)

a.shape  # -> (150, 150)
b = np.einsum("ij,ijk->k", a, (x[:, None, :] - x[None, :, :]) ** 2)
result = b.sum()

对于稀疏矩阵,可以使用以下代码:
import scipy.sparse as sp
import numpy as np
a = sp.random(150, 150)
x = np.random.normal(0, 1, size=(150, 20))
diff = (x[:, None, :] - x[None, :, :]) ** 2
diff.shape  # -> (150, 150, 20)

a.shape  # -> (150, 150)
b = np.einsum("ij,ijk->k", a.toarray(), (x[:, None, :] - x[None, :, :]) ** 2)
result = b.sum()

其中a是一个形状为[150,150]的COO(坐标格式)稀疏矩阵


1
正如我在问题中明确说明的那样,我需要a是稀疏的。仅仅将其转换为密集型的并不是一个选项,因为内存限制。 - Pavlin
已经编辑了基于 COO(坐标格式)的稀疏矩阵代码。如果您能将稀疏矩阵转换为 COO 格式,则 einsum 代码可以正常工作。 - Suraj Kumar
1
你的“编辑”只是我的代码的复制,再次通过 a.toarray() 将稀疏矩阵转换为密集数组。而且,COO 稀疏矩阵无法与 np.einsum 协同工作。 - Pavlin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接