对称稀疏矩阵的高效切片

5

我有一组稀疏对称矩阵sigma,满足以下条件:

len(sigma) = N

对于所有的i,j,k

sigma[i].shape[0] == sigma[i].shape[1] = m  # Square
sigma[i][j,k] == sigma[i][k,j]  # Symmetric

我有一个索引数组P,其定义如下:
P.shape[0] = N
P.shape[1] = k

我的目标是通过给定的索引P[i,:],提取sigma[i]k x k密集子矩阵。可以按照以下步骤完成:

sub_matrices = np.empty([N,k,k])
for i in range(N):
    sub_matrices[i,:,:] = sigma[i][np.ix_(P[i,:], P[i,:])].todense()

请注意,虽然k的值比较小,但N(和m)非常大。如果将稀疏对称矩阵存储为CSR格式,则会花费很长时间。我认为必须有更好的解决方案。例如,是否有一种稀疏格式适用于需要在两个维度上切片的对称矩阵?
我正在使用Python,但愿意接受任何可以使用Cython进行接口处理的C库建议。
额外提示:
请注意,我的当前Cython方法如下:
cimport cython
import numpy as np
cimport numpy as np

@cython.boundscheck(False) # turn off bounds-checking for entire function
cpdef sparse_slice_fast_cy(sigma,
                           long[:,:] P,
                           double[:,:,:] sub_matrices):
    """
    Inputs:
        sigma: A list (N,) of sparse sp.csr_matrix (m x m)
        P: A 2D array of integers (N, k)
        sub_matrices: A 3D array of doubles (N, k, k) containing the slicing
    """
    # Create variables for keeping code tidy
    cdef long N = P.shape[0]
    cdef long k = P.shape[1]

    cdef long i
    cdef long j
    cdef long index_pointer 
    cdef long sparse_row_pointer

    # Create objects for holding sparse matrix data
    cdef double[:] data
    cdef long[:] indices
    cdef long[:] indptr

    # Object for the ordered P
    cdef long[:] perm

    # Make sure sub_matrices is all 0
    sub_matrices[:] = 0

    for i in range(N):
        # Sort the P
        perm = np.argsort(P[i,:])

        # Get the sparse matrix values
        data     = sigma[i].data
        indices  = sigma[i].indices.astype(long)
        indptr   = sigma[i].indptr.astype(long)

        for j in range(k):
            # Loop over row P[i, perm[j]] in sigma searching for values
            # in P[i, :] vector i.e. compare
            #     sigma[P[i, perm[j], :]
            # against
            #     P[i,:]

            # To do this we need our sparse row vector with columns 
            #     indices[indptr[P[i, perm[j]]], indptr[P[i, perm[j]]+1]]
            # and data/values
            #     data[indptr[P[i, perm[j]]], indptr[P[i, perm[j]]+1]]
            # which comes from the csr matrix format.
            # We also need our sorted indexing vector
            #     P[i, perm[:]]

            # We begin by pointing at the top of both
            # our vectors and gradually move down them. In the event of 
            # an equality we add the data to sub_matrices[i,:,:] and 
            # increment the INDEXING VECTOR pointer, not the sparse
            # row vector pointer, as there can be multiple values that 
            # are the same in the indexing vector but not the sparse row
            # column vector (only 1 column can appear in 1 row!).
            index_pointer = 0
            sparse_row_pointer = indptr[P[i, perm[j]]]

            while ((index_pointer < k) and (sparse_row_pointer < indptr[P[i, perm[j]] + 1])):
                if indices[sparse_row_pointer] == P[i, perm[index_pointer]]:
                    # We can add data to sub_matrices
                    sub_matrices[i, perm[j], perm[index_pointer]] = \
                           data[sparse_row_pointer]

                    # Only increment the index pointer
                    index_pointer += 1
                elif indices[sparse_row_pointer] > P[i, perm[index_pointer]]:
                    # Need to increment index pointer
                    index_pointer += 1
                else:
                    # Need to increment sparse row pointer
                    sparse_row_pointer += 1

我认为当对相对较小的向量经常调用时,np.argsort 可能效率不高,我希望使用 C 实现进行替换。我也没有利用可能加速 N 稀疏矩阵的并行处理。不幸的是,在外部循环中存在 Python 强制转换,因此我不知道如何使用 prange
另一个要注意的问题是,Cython 方法似乎使用了大量内存,但我不知道在哪里分配了内存。
最新版本
根据 ead 的建议,以下是 Cython 代码的最新版本。
cimport cython
import numpy as np
cimport numpy as np

@cython.boundscheck(False) # turn off bounds-checking for entire function
cpdef sparse_slice_fast_cy(sigma,
                           np.ndarray[np.int32_t, ndim=2] P,
                           np.float64_t[:,:,:] sub_matrices,
                           int symmetric):
    """
    Inputs:
        sigma: A list (N,) of sparse sp.csr_matrix (m x m)
        P: A 2D array of integers (N, k)
        sub_matrices: A 3D array of doubles (N, k, k) containing the slicing
        symmetric: 1 if the sigma matrices are symmetric
    """
    # Create variables for keeping code tidy
    cdef np.int32_t N = P.shape[0]
    cdef np.int32_t k = P.shape[1]

    cdef np.int32_t i
    cdef np.int32_t j
    cdef np.int32_t index_pointer 
    cdef np.int32_t sparse_row_pointer

    # Create objects for holding sparse matrix data
    cdef np.float64_t[:] data
    cdef np.int32_t[:] indices

    cdef np.int32_t[:] indptr

    # Object for the ordered P
    cdef np.int32_t[:,:] perm = np.argsort(P, axis=1).astype(np.int32)

    # Make sure sub_matrices is all 0
    sub_matrices[:] = 0

    for i in range(N):
        # Get the sparse matrix values
        data     = sigma[i].data
        indices  = sigma[i].indices
        indptr   = sigma[i].indptr

        for j in range(k):
            # Loop over row P[i, perm[j]] in sigma searching for values
            # in P[i, :] vector i.e. compare
            #     sigma[P[i, perm[j], :]
            # against
            #     P[i,:]

            # To do this we need our sparse row vector with columns 
            #     indices[indptr[P[i, perm[j]]], indptr[P[i, perm[j]]+1]]
            # and data/values
            #     data[indptr[P[i, perm[j]]], indptr[P[i, perm[j]]+1]]
            # which comes from the csr matrix format.
            # We also need our sorted indexing vector
            #     P[i, perm[:]]

            # We begin by pointing at the top of both
            # our vectors and gradually move down them. In the event of 
            # an equality we add the data to sub_matrices[i,:,:] and 
            # increment the INDEXING VECTOR pointer, not the sparse
            # row vector pointer, as there can be multiple values that 
            # are the same in the indexing vector but not the sparse row
            # column vector (only 1 column can appear in 1 row!).

            if symmetric:
                index_pointer = j  # Only search upper triangular
            else:
                index_pointer = 0
            sparse_row_pointer = indptr[P[i, perm[i, j]]]

            while ((index_pointer < k) and (sparse_row_pointer < indptr[P[i, perm[i, j]] + 1])):
                if indices[sparse_row_pointer] == P[i, perm[i, index_pointer]]:
                    # We can add data to sub_matrices
                    sub_matrices[i, perm[i, j], perm[i, index_pointer]] = \
                           data[sparse_row_pointer]

                    if symmetric:
                        sub_matrices[i, perm[i, index_pointer], perm[i, j]] = \
                               data[sparse_row_pointer]

                    # Only increment the index pointer
                    index_pointer += 1
                elif indices[sparse_row_pointer] > P[i, perm[i, index_pointer]]:
                    # Need to increment index pointer
                    index_pointer += 1
                else:
                    # Need to increment sparse row pointer
                    sparse_row_pointer += 1

并行版本

以下是一个并行版本,虽然看起来没有提供任何速度优势,而且代码也不再像之前那么漂亮:

# See https://dev59.com/V6nka4cB1Zd3GeqPIRSa
cimport cython
import numpy as np
cimport numpy as np
from libc.stdlib cimport malloc, free
from cython.parallel import prange

@cython.boundscheck(False) # turn off bounds-checking for entire function
cpdef sparse_slice_fast_cy(sigma,
                           np.ndarray[np.int32_t, ndim=2] P,
                           np.float64_t[:,:,:] sub_matrices,
                           int symmetric):
    """
    Inputs:
        sigma: A list (N,) of sparse sp.csr_matrix (m x m)
        P: A 2D array of integers (N, k)
        sub_matrices: A 3D array of doubles (N, k, k) containing the slicing
        symmetric: 1 if the sigma matrices are symmetric
    """
    # Create variables for keeping code tidy
    cdef np.int32_t N = P.shape[0]
    cdef np.int32_t k = P.shape[1]

    cdef np.int32_t i
    cdef np.int32_t j
    cdef np.int32_t index_pointer 
    cdef np.int32_t sparse_row_pointer

    # Create objects for holding sparse matrix data
    cdef np.float64_t[:] data_mem_view
    cdef np.int32_t[:] indices_mem_view
    cdef np.int32_t[:] indptr_mem_view

    cdef np.float64_t **data = <np.float64_t **> malloc(N * sizeof(np.float64_t *))
    cdef np.int32_t **indices = <np.int32_t **> malloc(N * sizeof(np.int32_t *))
    cdef np.int32_t **indptr = <np.int32_t **> malloc(N * sizeof(np.int32_t *))

    for i in range(N):
        data_mem_view = sigma[i].data
        data[i] = &(data_mem_view[0])

        indices_mem_view = sigma[i].indices
        indices[i] = &(indices_mem_view[0])

        indptr_mem_view = sigma[i].indptr
        indptr[i] = &(indptr_mem_view[0])

    # Object for the ordered P
    cdef np.int32_t[:,:] perm = np.argsort(P, axis=1).astype(np.int32)

    # Make sure sub_matrices is all 0
    sub_matrices[:] = 0

    for i in prange(N, nogil=True):
        for j in range(k):
            # Loop over row P[i, perm[j]] in sigma searching for values
            # in P[i, :] vector i.e. compare
            #     sigma[P[i, perm[j], :]
            # against
            #     P[i,:]
            # To do this we need our sparse row vector with columns 
            #     indices[indptr[P[i, perm[j]]], indptr[P[i, perm[j]]+1]]
            # and data/values
            #     data[indptr[P[i, perm[j]]], indptr[P[i, perm[j]]+1]]
            # which comes from the csr matrix format.
            # We also need our sorted indexing vector
            #     P[i, perm[:]]

            # We begin by pointing at the top of both
            # our vectors and gradually move down them. In the event of 
            # an equality we add the data to sub_matrices[i,:,:] and 
            # increment the INDEXING VECTOR pointer, not the sparse
            # row vector pointer, as there can be multiple values that 
            # are the same in the indexing vector but not the sparse row
            # column vector (only 1 column can appear in 1 row!).

            if symmetric:
                index_pointer = j  # Only search upper triangular
            else:
                index_pointer = 0
            sparse_row_pointer = indptr[i][P[i, perm[i, j]]]

            while ((index_pointer < k) and 
                   (sparse_row_pointer < indptr[i][P[i, perm[i, j]] + 1])):
                if indices[i][sparse_row_pointer] == P[i, perm[i, index_pointer]]:
                    # We can add data to sub_matrices
                    sub_matrices[i, perm[i, j], perm[i, index_pointer]] = \
                           data[i][sparse_row_pointer]

                    if symmetric:
                        sub_matrices[i, perm[i, index_pointer], perm[i, j]] = \
                               data[i][sparse_row_pointer]

                    # Only increment the index pointer
                    index_pointer = index_pointer + 1
                elif indices[i][sparse_row_pointer] > P[i, perm[i, index_pointer]]:
                    # Need to increment index pointer
                    index_pointer = index_pointer + 1
                else:
                    # Need to increment sparse row pointer
                    sparse_row_pointer = sparse_row_pointer + 1

    # Free malloc'd data
    free(data)
    free(indices)
    free(indptr)

测试

要运行代码进行测试

cythonize -i sparse_slice.pyx

其中 sparse_slice.pyx 是文件名。然后你可以使用这个脚本:

import time
import numpy as np
import scipy as sp
import scipy.sparse
from sparse_slice import sparse_slice_fast_cy

k = 100
N = 20000
m = 10000
samples = 20

# Create sigma matrices
## The sampling of random sparse takes a while so just do a few and 
## then populate with these.
now = time.time()
sigma_samples = []
for i in range(samples):
    sigma_samples.append(sp.sparse.rand(m, m, density=0.001, format='csr'))
    sigma_samples[-1] = sigma_samples[-1] + sigma_samples[-1].T  # Symmetric

## Now make the sigma list from these.
sigma = []
for i in range(N):
    j = np.random.randint(samples)
    sigma.append(sigma_samples[j])
print('Time to make sigma: {}'.format(time.time() - now))

# Create indexer
now = time.time()
P = np.empty([N, k]).astype(int)
for i in range(N):
    P[i, :] = np.random.choice(np.arange(m), k, replace=True)
print('Time to make P: {}'.format(time.time() - now))

# Create objects for holding the slices
sub_matrices_slow = np.empty([N, k, k])
sub_matrices_fast = np.empty([N, k, k])

# Run both slicings
## Slow
now = time.time()
for i in range(N):
    sub_matrices_slow[i,:,:] = sigma[i][np.ix_(P[i,:], P[i,:])].todense()
print('Time to make sub_matrices_slow: {}'.format(time.time() - now))

## Fast
symmetric = 1
now = time.time()
sparse_slice_fast_cy(sigma, P.astype(np.int32), sub_matrices_fast, symmetric)
print('Time to make sub_matrices_fast: {}'.format(time.time() - now))

assert(np.all((sub_matrices_slow - sub_matrices_fast)**2 < 1e-6))

1
Scipy稀疏矩阵在对称矩阵上并没有特殊处理。csr矩阵索引实际上是通过矩阵乘法来执行的。你了解这些矩阵是如何存储的吗?将sigma[i]转换为密集矩阵,然后进行索引可能会更快。 - hpaulj
如果您分享您的Cython方法,您肯定会得到提示,以改进(如果可能的话)。这将比一般的“我想要一个快速解决方案”更好。 - ead
好的,我会先整理一下,然后分享给你。不幸的是,这很复杂(至少对我来说是),但幸运的是不太长。 - rwolst
当你说“HUGE”时,你指的是什么?因子2、10、100? - ead
这很难测量,因为我只是在查看htop(如果使用太大的值,系统会锁定)。对于测试脚本中的当前值,它比Python程序中声明的多使用约3GB。但我不知道是什么在Cython代码中分配了内存。将N增加到约50000会导致我内存不足。虽然我刚才想到可能是numpy没有为sub_matrices_fast = np.empty([N, k, k])分配内存,然后在Cython中使用sub_matrices[:] = 0时发生了这种情况。 - rwolst
显示剩余4条评论
1个回答

2

目前无法进行测试,但有两个建议:

A)在i循环的一侧一次性对所有行进行排序:

# Object for the ordered P
cdef long[:,:] perm = np.argsort(P, axis=1)

也许你需要传递P作为np.ndarray [np.int64_t,ndim = 2] P(或者它的任何类型),以避免复制。您必须通过perm [i,X]而不是perm [X]访问数据。
B)定义
cdef np.int32_t[:] indices
cdef np.int32_t[:] indptr

因此,您不需要通过 '.astype' 复制数据,即

for i in range(N):
    data     = sigma[i].data
    indices  = sigma[i].indices
    indptr   = sigma[i].indptr

我认为由于sigma[i]O(m)个元素,所以复制是您的函数的瓶颈:您将获得O(N*(m+k^2))的运行时间,而不是`O(N*k^2) - 避免这种情况是很好的。
否则,该函数看起来并不太糟糕。
要使prange-loop一起使用,您应将对sigma[i]的访问移动到循环之外,方法是创建指向dataindicesindptr第一个元素的指针数组,并在廉价预处理步骤中填充它们。可以让它工作,但问题是并行化的收益有多少-可能情况是受限于内存-必须观察计时。
您还可以通过仅处理上三角矩阵来利用对称性:
  ...
  index_pointer = j #only upper triangle!
  ....
  ....
     # We can add data to sub_matrices
     #upper triangle sub-matrix:
     sub_matrices[i, perm[j], perm[index_pointer]] = \
                       data[sparse_row_pointer]
     #lower triangle sub-matrix:
     sub_matrices[i, perm[index_pointer], perm[j]] = \
                       data[sparse_row_pointer]
  ....

我会开始执行B)并查看其效果...
编辑: 关于内存使用:可以通过以下方式测量峰值内存使用率
 /usr/bin/time -f "peak_used_memory:%M(in Kb)" python test.py

我使用 N=2000 运行我的测试,并得到以下结果 (python3.6+cython0.27.1):

                             peak memory usage
only slow                       245Mb
only fast                       245Mb
slow+fast no check              402Mb
slow+fast+assert                576Mb

因此,有50Mb的开销,200Mb由任何一个函数使用,另外还需要176 Mb来评估断言。我也可以看到在其他N值时也有同样的行为。

所以我认为cython没有很大的内存使用。


这个任务很可能(至少部分)是内存绑定的,因此并行化不会有太大帮助。您应该减少加载到缓存中的内存量。

一种可能性是不使用perm——毕竟它也需要加载到缓存中。如果您能接受矩阵sigma中任何行/列置换,那么您可以做到这一点。

  1. 您可以使用排序后的P
  2. 每行元素非常少,因此每个元素的线性搜索将是可行的。
  3. 对每个元素进行二进制搜索。

我想在最好的情况下您可以获得20-30%的胜利。

有时,cython生成的代码不容易优化给c编译器,并且人们经常通过直接在C中编写然后用python包装来获得更好的结果。

但是,只有在此操作确实是程序的瓶颈时,才会执行所有这些操作。


顺便说一下,声明

cdef np.int64_t[:,:] perm = np.argsort(P, axis=1)

您不需要进行额外的复制。


很好的建议,现在Cython运行速度更快了。请注意,它仍然使用大量内存。我不明白为什么,因为现在唯一被创建的对象是“perm”。我唯一的猜测是“sigma”可能被复制了。 - rwolst
@rwolst 我没有在Cython的内存使用中看到任何特别之处 - 请参阅我的编辑以获取更多信息。 - ead
我认为如果我做数学计算,那么预期需要大量的内存。对于输出矩阵,Nkk*8(双精度)看起来是正确的。我认为这是不理解np.empty机制和惊讶地看着htop一次性分配所有内存而不是逐步缓慢分配的混合体验。 - rwolst

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接