高效地累积一组稀疏的scipy矩阵

8
我有一个 O(N) x N x N 的 scipy.sparse.csr_matrix 集合,每个稀疏矩阵大约有N个元素被设置。我想把所有这些矩阵加在一起得到一个常规的 NxN numpy 数组。(N 大约是1000)。矩阵中非零元素的排列方式使得结果总和肯定不是稀疏的(实际上几乎没有零元素)。
目前,我只是做如下操作:
reduce(lambda x,y: x+y,[m.toarray() for m in my_sparse_matrices])

这个方法可以工作,但速度有点慢:当然,在那里进行的零点处理的数量是非常可怕的。

有更好的方法吗?在文档中没有明显的方法。

更新:根据user545424的建议,我尝试了稀疏矩阵求和的替代方案,并将稀疏矩阵求和到密集矩阵上。下面的代码展示了所有的方法在相同时间内运行(Python 2.6.6在amd64 Debian/Squeeze上的四核i7上)。

import numpy as np
import numpy.random
import scipy
import scipy.sparse
import time

N=768
S=768
D=3

def mkrandomsparse():
    m=np.zeros((S,S),dtype=np.float32)
    r=np.random.random_integers(0,S-1,D*S)
    c=np.random.random_integers(0,S-1,D*S)
    for e in zip(r,c):
        m[e[0],e[1]]=1.0
    return scipy.sparse.csr_matrix(m)

M=[mkrandomsparse() for i in xrange(N)]

def plus_dense():
    return reduce(lambda x,y: x+y,[m.toarray() for m in M])

def plus_sparse():
    return reduce(lambda x,y: x+y,M).toarray()

def sum_dense():
    return sum([m.toarray() for m in M])

def sum_sparse():
    return sum(M[1:],M[0]).toarray()

def sum_combo():  # Sum the sparse matrices 'onto' a dense matrix?
    return sum(M,np.zeros((S,S),dtype=np.float32))

def benchmark(fn):
    t0=time.time()
    fn()
    t1=time.time()
    print "{0:16}:  {1:.3f}s".format(fn.__name__,t1-t0)

for i in xrange(4):
    benchmark(plus_dense)
    benchmark(plus_sparse)
    benchmark(sum_dense)
    benchmark(sum_sparse)
    benchmark(sum_combo)
    print

并注销

plus_dense      :  1.368s
plus_sparse     :  1.405s
sum_dense       :  1.368s
sum_sparse      :  1.406s
sum_combo       :  1.039s

虽然你可以通过调整N、S、D参数中的一个方法或另一个方法,使其超前约2倍......但与考虑应该能够跳过的零添加数量相比,你希望看到的数量级改善无法实现。

5个回答

4

如果您的矩阵非常稀疏,我认为我已经找到了一种将其加速约10倍的方法。

In [1]: from scipy.sparse import csr_matrix

In [2]: def sum_sparse(m):
   ...:     x = np.zeros(m[0].shape)
   ...:     for a in m:
   ...:         ri = np.repeat(np.arange(a.shape[0]),np.diff(a.indptr))
   ...:         x[ri,a.indices] += a.data
   ...:     return x
   ...: 

In [6]: m = [np.zeros((100,100)) for i in range(1000)]

In [7]: for x in m:
   ...:     x.ravel()[np.random.randint(0,x.size,10)] = 1.0
   ...:     

        m = [csr_matrix(x) for x in m]

In [17]: (sum(m[1:],m[0]).todense() == sum_sparse(m)).all()
Out[17]: True

In [18]: %timeit sum(m[1:],m[0]).todense()
10 loops, best of 3: 145 ms per loop

In [19]: %timeit sum_sparse(m)
100 loops, best of 3: 18.5 ms per loop

啊,太棒了!这就是我期望的高效算法;只是有点可惜似乎并没有作为更高效的“内置函数”提供。很快会尝试它... - timday
是的,这取决于密度,但对我感兴趣的数字来说,通常可以提高10倍的速度。 - timday
2
太神奇了。我刚刚在其他几个我有稀疏密集交互的地方应用了同样的模式——通常是点积类型的东西——每次都获得了实质性的速度提升(2倍至3倍)。 - timday

4

@user545424已经发布了可能是最快的解决方案。在相同的精神下,更易读且速度~相同的东西…nonzero()具有各种有用的应用程序。

def sum_sparse(m):
        x = np.zeros(m[0].shape,m[0].dtype)
        for a in m:
            # old lines
            #ri = np.repeat(np.arange(a.shape[0]),np.diff(a.indptr))
            #x[ri,a.indices] += a.data
            # new line
            x[a.nonzero()] += a.data
        return x

优雅而美丽的翻译! - Minstein

1

在转换为密集矩阵之前,您不能将它们加在一起吗?

>>> sum(my_sparse_matrices[1:],my_sparse_matrices[0]).todense()

尝试了这个方法(请参见更新的问题),但速度提升并不明显(如果有的话),可能是因为随着中间结果变得更加密集,这变成了一件复杂的事情。我曾经希望将稀疏矩阵求和到一个(最初为零的)密集矩阵上会更有效率,但似乎并非如此。 - timday

1

将其转换为二维数组并使用稀疏矩阵的内置乘法。这比@user545424的方法更快。

import numpy as np
from scipy.sparse import csr_matrix

m = [np.zeros((100,100)) for i in range(1000)]
for x in m:
   x.ravel()[np.random.randint(0,x.size,10)] = 1.0

m = [csr_matrix(x) for x in m]

def sum_sparse(m):
     x = np.zeros(m[0].shape)
     for a in m:
         ri = np.repeat(np.arange(a.shape[0]),np.diff(a.indptr))
         x[ri,a.indices] += a.data
     return x

def sum_sparse2(m):
    n_idx = []
    count = 0
    data = []
    indptr = [0]
    for a in m:
        b = a.indptr
        c = np.repeat(np.arange(b.shape[0]-1), b[1:] - b[:-1])
        n_idx.append(np.ravel_multi_index((c,a.indices), dims=a.shape))
        data.append(a.data)
        count += len(a.indices)
        indptr.append(count)
    
    data = np.concatenate(data)
    indptr = np.array(indptr)
    n_idx = np.concatenate(n_idx)
    mc = csr_matrix((data, n_idx, indptr), shape=(1000,100*100))
    
    res_sum = (np.ones(1000) @ mc).reshape((100,100))
    return res_sum

%timeit -r 10 sum_sparse2(m)
#6.46 ms ± 145 µs per loop (mean ± std. dev. of 10 runs, 100 loops each)

%timeit -r 10 sum_sparse(m)
#10.3 ms ± 114 µs per loop (mean ± std. dev. of 10 runs, 100 loops each)

1

这并不是一个完整的答案(我也希望看到更详细的回答),但是你可以通过不创建中间结果来轻松获得两倍或更多的改进:

def sum_dense():
    return sum([m.toarray() for m in M])

def sum_dense2():
    return sum((m.toarray() for m in M))

在我的机器上(可能因人而异),这将导致计算速度最快。通过将求和放置在()而不是[]中,我们构建了一个生成器,而不是在求和之前构建整个列表。

谢谢,我之前没听说过“生成器表达式”http://www.python.org/dev/peps/pep-0289/。在我的测试案例中只有少量改进(约25%),但我肯定会更多地使用它们。 - timday
@timday 注意到的改进是将sum_densesum_dense2进行比较,而不是与其他方法进行比较。如果您要在算法之间进行比较,就不应该因为实现不良(在这种情况下,您正在不必要地复制数组)而惩罚特定的选择。 - Hooked

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接