布尔矩阵计算的最快方法

6

我有一个布尔矩阵,它有1.5E6行和20E3列,类似于这个例子:

M = [[ True,  True, False,  True, ...],
     [False,  True,  True,  True, ...],
     [False, False, False, False, ...],
     [False,  True, False, False, ...],
     ...
     [ True,  True, False, False, ...]
     ]

此外,我还有另一个矩阵N1.5E6行,1列):

 N = [[ True],
      [False],
      [ True],
      [ True],
      ...
      [ True]
      ]

我需要做的是,逐列对矩阵 M 的每对列(1&1、1&2、1&3、1&N、2&1、2&2 等)进行 AND 运算,并计算结果与矩阵 N 之间有多少重叠。
我的 Python/Numpy 代码如下:
for i in range(M.shape[1]):
  for j in range(M.shape[1]):
    result = M[:,i] & M[:,j] # Combine the columns with AND operator
    count = np.sum(result & N.ravel()) # Counts the True occurrences
    ... # Save the count for the i and j pair

问题在于,使用两个for循环对20E3 x 20E3组合进行遍历计算速度非常慢(需要5-10天的时间来计算)。我尝试了一种更好的方法,那就是将每列与整个矩阵M进行比较:

for i in range(M.shape[1]):
  result = M[:,i]*M.shape[1] & M # np.tile or np.repeat is used to horizontally repeat the column
  counts = np.sum(result & N*M.shape[1], axis=0)
  ... # Save the counts

这将减少开销和计算时间约为10%,但仍需要大约1天的时间来计算。 我的问题是:有没有更快的方法(也许不是Python)来进行这些计算(基本上只是AND和SUM)? 我在考虑低级语言、GPU处理、量子计算等等,但我对这些都不太了解,所以欢迎任何关于方向的建议! 其他想法: 目前在思考是否有使用点积的快速方法来计算组合的三元组(如Davikar提出的)。
def compute(M, N):
    out = np.zeros((M.shape[1], M.shape[1], M.shape[1]), np.int32)
    for i in range(M.shape[1]):
        for j in range(M.shape[1]):
            for k in range(M.shape[1]):
                result = M[:, i] & M[:, j] & M[:, k]
                out[i, j, k] = np.sum(result & N.ravel())
    return out

1
问题中是否有使用 tensorflow 标签的原因?当您说“1.5mm”时,是指150万行吗?只是为了澄清。此外,您对 count 做什么?您是累加总和,还是每个 count 存储在单独的位置,或者其他什么操作? - jdehesa
没有标签与此理论问题相关。如果有更好的标签,请告诉我。有150万行数据。是的,我正在存储True出现的次数,np.sum返回布尔数组的计数。 - Franc Weser
是的,但我的意思是,所有“count”值都应该被累加到全局总数中,或者每对列的“count”应该被单独存储?(在您的循环中,您只是在每次迭代中替换“count”的值,因此我不确定您是否稍后会对其进行其他操作或仅仅累加它)。 - jdehesa
所以我放了点点点,这正是我在做的事情,基本上是存储每个i和j对的计数。 - Franc Weser
3个回答

8

只需使用np.einsum即可获取所有统计数 -

np.einsum('ij,ik,i->jk',M,M.astype(int),N.ravel())

可以随意尝试使用带有optimize标志的np.einsum,也可以尝试不同的数据类型转换。

要利用GPU,我们可以使用支持einsumtensorflow包。

使用np.dot更快的替代方案:

(M&N).T.dot(M.astype(int))
(M&N).T.dot(M.astype(np.float32))

时间 -

In [110]: np.random.seed(0)
     ...: M = np.random.rand(500,300)>0.5
     ...: N = np.random.rand(500,1)>0.5

In [111]: %timeit np.einsum('ij,ik,i->jk',M,M.astype(int),N.ravel())
     ...: %timeit (M&N).T.dot(M.astype(int))
     ...: %timeit (M&N).T.dot(M.astype(np.float32))
227 ms ± 191 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
66.8 ms ± 198 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
3.26 ms ± 753 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

并利用float32转换使两个布尔数组更进一步-
In [122]: %%timeit
     ...: p1 = (M&N).astype(np.float32)
     ...: p2 = M.astype(np.float32)
     ...: out = p1.T.dot(p2)
2.7 ms ± 34.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

非常感谢!理论上似乎可以工作,但某些计数比预期要高:https://repl.it/repls/PlainUnwieldyCurrency - Franc Weser
1
@FrancWeser 当您说“结果更高”时,是否意味着计数比基于循环的解决方案获得的计数更大? - Divakar
1
@FrancWeser 好的,理论上这应该是可行的。我也测试过一个随机数据 (500,300),那个也可以正常工作。你确定你已经仔细检查过了吗?你是怎么再次检查的呢? - Divakar
1
@FrancWeser 嗯,只需在 einsum 中增加一个参数 M: np.einsum('ij,ik,il,i->jkl',M,M,M.astype(int),N.ravel()) - Divakar
1
@FrancWeser 很遗憾,np.dot 只接受两个参数。因此,我们可以像之前一样使用两个矩阵进行操作。但是对于编辑后的代码所需的三个版本的 M,我们没有很好的方法。认为 einsum 是你能做的最好的选择。 - Divakar
显示剩余4条评论

6

编辑:为了修复下面代码以适应更正后的问题,只需要在compute中进行一些小的更改:

def compute(m, n):
    m = np.asarray(m)
    n = np.asarray(n)
    # Apply mask N in advance
    m2 = m & n
    # Pack booleans into uint8 for more efficient bitwise operations
    # Also transpose for better caching (maybe?)
    mb = np.packbits(m2.T, axis=1)
    # Table with number of ones in each uint8
    num_bits = (np.arange(256)[:, np.newaxis] & (1 << np.arange(8))).astype(bool).sum(1)
    # Allocate output array
    out = np.zeros((m2.shape[1], m2.shape[1]), np.int32)
    # Do the counting with Numba
    _compute_nb(mb, num_bits, out)
    # Make output symmetric
    out = out + out.T
    # Add values in diagonal
    out[np.diag_indices_from(out)] = m2.sum(0)
    # Scale by number of ones in n
    return out

我会使用 Numba 进行操作,并使用一些技巧。首先,你只需进行一半的列操作,因为另一半是重复的。其次,你可以将布尔值打包成字节,这样每个& 操作就能同时处理八个值而不是一个。第三,你可以使用多进程来并行化它。总体上,你可以像这样操作:

import numpy as np
import numba as nb

def compute(m, n):
    m = np.asarray(m)
    n = np.asarray(n)
    # Pack booleans into uint8 for more efficient bitwise operations
    # Also transpose for better caching (maybe?)
    mb = np.packbits(m.T, axis=1)
    # Table with number of ones in each uint8
    num_bits = (np.arange(256)[:, np.newaxis] & (1 << np.arange(8))).astype(bool).sum(1)
    # Allocate output array
    out = np.zeros((m.shape[1], m.shape[1]), np.int32)
    # Do the counting with Numba
    _compute_nb(mb, num_bits, out)
    # Make output symmetric
    out = out + out.T
    # Add values in diagonal
    out[np.diag_indices_from(out)] = m.sum(0)
    # Scale by number of ones in n
    out *= n.sum()
    return out

@nb.njit(parallel=True)
def _compute_nb(mb, num_bits, out):
    # Go through each pair of columns without repetitions
    for i in nb.prange(mb.shape[0] - 1):
        for j in nb.prange(1, mb.shape[0]):
            # Count common bits
            v = 0
            for k in range(mb.shape[1]):
                v += num_bits[mb[i, k] & mb[j, k]]
            out[i, j] = v

# Test
m = np.array([[ True,  True, False,  True],
              [False,  True,  True,  True],
              [False, False, False, False],
              [False,  True, False, False],
              [ True,  True, False, False]])
n = np.array([[ True],
              [False],
              [ True],
              [ True],
              [ True]])
out = compute(m, n)
print(out)
# [[ 8  8  0  4]
#  [ 8 16  4  8]
#  [ 0  4  4  4]
#  [ 4  8  4  8]]

简单对比一下,这里的小型基准测试是针对原始循环和仅使用NumPy方法的(我相信Divakar的提议是你能从NumPy中得到的最佳方案):

import numpy as np

# Original loop

def compute_loop(m, n):
    out = np.zeros((m.shape[1], m.shape[1]), np.int32)
    for i in range(m.shape[1]):
        for j in range(m.shape[1]):
            result = m[:, i] & m[:, j]
            out[i, j] = np.sum(result & n)
    return out

# Divakar methods

def compute2(m, n):
    return np.einsum('ij,ik,lm->jk', m, m.astype(int), n)

def compute3(m, n):
    return np.einsum('ij,ik->jk',m, m.astype(int)) * n.sum()

def compute4(m, n):
    return np.tensordot(m, m.astype(int),axes=((0,0))) * n.sum()

def compute5(m, n):
    return m.T.dot(m.astype(int))*n.sum()

# Make random data
np.random.seed(0)
m = np.random.rand(1000, 100) > .5
n = np.random.rand(1000, 1) > .5
print(compute(m, n).shape)
# (100, 100)

%timeit compute(m, n)
# 768 µs ± 17.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
%timeit compute_loop(m, n)
# 11 s ± 1.23 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit compute2(m, n)
# 7.65 s ± 1.06 s per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit compute3(m, n)
# 23.5 ms ± 1.53 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit compute4(m, n)
# 8.96 ms ± 194 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit compute5(m, n)
# 8.35 ms ± 266 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

这太棒了,谢谢,我要去检查一下!如Divakar的帖子中所述,我犯了一个错误,写成了“np.sum(result & n)”而不是“np.sum(result & n.ravel())”,导致错误地得出了1D2D数组的总和。预期计数是1D1D数组。我会看看能否调整您的代码来解决这个问题。 - Franc Weser
1
@FrancWeser 我也有这个怀疑,因为它看起来是一个奇怪的操作。无论如何,我已经编辑了答案并修复了这个问题。 - jdehesa

0
我建议先把 Python 搞定:将你的列转换为位字段,也将 N 转换为位字段,然后将每个三元组进行按位与操作,最后使用 (bin(num).count('1'))(或者如果 numpy 有的话,使用适当的 popcnt 函数)即可。

谢谢,做这个最好的语言/资源是什么? - Franc Weser

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接