在按行比较的矩阵和向量之间寻找交集

3

考虑以下内容:

tmp1 = ['a', 'b', 'c', 'd', 'e']
tmp2 = ['f', 'g', 'h', 'b', 'd']
tmp3 = ['b', 'i', 'j', 'k', 'l']
matr = np.array([tmp1, tmp2, tmp3])

matr

生成一个矩阵:
array([['a', 'b', 'c', 'd', 'e'],
   ['f', 'g', 'h', 'b', 'd'],
   ['b', 'i', 'j', 'k', 'l']], 
  dtype='|S1')

现在,我想知道与一个向量相交的每行值的总和。比如说,
vec = ['a', 'c', 'f', 'b']
[sum([y in vec for y in row]) for row in matr]

返回,
[3, 2, 1]

这是所期望的输出结果。问题在于我的'matr'实际上是≈1000000 x 2200,而我有6700个向量要进行比较。我目前的解决方案速度太慢了,需要怎样改进呢?
值得注意的是,matr中的值来自一组约30000个值,并且我拥有完整的集合。我考虑过的解决方案是,对这30000个值与每个向量建立一个字典,在矩阵中使用字典将其转换为True/False,然后按行加和。但我不确定这是否有帮助。

1
在实际使用情况下,所有元素都是单个字符吗? - Divakar
我的描述方式有误,实际上不是那么长,大约12-13个字符左右。 - John Rouhana
这些“6700个向量”长度相同吗?这些向量的典型长度是多少? - Divakar
很遗憾,它们并不是。有些只有50左右,但它们都在10到1000之间。 - John Rouhana
4个回答

2

对于作为数组的 matrvec,可以使用 np.searchsorted 来实现 -


def count_in_rowwise(matr,vec):
    sidx = vec.argsort()
    idx = np.searchsorted(vec,matr,sorter=sidx)
    idx[idx==len(vec)] = 0
    return (vec[sidx[idx]] == matr).sum(1)

如果vec比较小,我们可以进行预排序并使用它来计算行数的另一种方法,如下所示 -

最初的回答:

With a smaller vec, we can pre-sort it and use an alternative method to compute row-counts:

def count_in_rowwise_v2(matr,vec,assume_sorted=False):
    if assume_sorted==1:
        sorted_vec = vec
    else:
        sorted_vec = np.sort(vec)
    idx = np.searchsorted(sorted_vec,matr)
    idx[idx==len(sorted_vec)] = 0
    return (sorted_vec[idx] == matr).sum(1)

以上解决方案适用于通用输入(数字或字符串)。为了解决我们特定的字符串情况,我们可以通过使用np.unique将字符串转换为数字,然后重新使用count_in_rowwise/count_in_rowwise_v2,这将给我们第二种方法,如下所示 -

最初的回答适用于通用输入(数字或字符串)。为了解决我们特定的字符串情况,我们可以通过使用np.unique将字符串转换为数字,然后重新使用count_in_rowwise/count_in_rowwise_v2,这将给我们第二种方法,如下所示 -

u,ids = np.unique(matr, return_inverse=True)
out = count_in_rowwise(ids.reshape(matr.shape),ids[np.searchsorted(u,vec)])

虽然这听起来很有趣,但我对它能否更快持怀疑态度...我明天会进行基准测试,并确保回复您。 - John Rouhana
@JohnRouhana 有什么特别的原因让你持怀疑态度吗?比原来的循环更快? - Divakar
看起来好像步骤更多,所以我很惊讶它比其他建议的方法更快。我是一个生物信息学家,不是纯粹的计算机科学家,所以我不太明白为什么它比其他建议的方法快这么多。但确实如此。我对1000x2200矩阵和50个向量进行了基准测试。从我的方法转换到你的方法后,平均时间从每个向量约4.9秒降至每个向量约0.088秒。这大约是55倍的加速 - 太棒了。 - John Rouhana
1
@JohnRouhana 很高兴收到反馈!我之前一直在使用searchsorted,因为它非常高效,只需要对其排序要求进行一些额外的工作。此外,如果您正在对同一个matr针对这6700个向量进行操作,我建议您采用最后的建议:使用np.unique()预先计算u,ids,并在这6700个vec上重复使用它,以仅对每个新的vec使用count_in_rowwise(ids.reshape(matr.shape),ids[np.searchsorted(u,vec)]) - Divakar
是的,这正是我最终采取的做法。然而,对于一个1,000,000x2200矩阵来说,仍然似乎很不方便;根据我的预测,计算所有向量可能需要大约7天的时间。尽管如此,您的解决方案已经将100,000x2200的运行时间缩短到了大约18个小时,这是完全可以接受的。在大多数情况下,我并不需要运行全部一百万条数据。 - John Rouhana

2
你可以使用集合交集来加速处理。以下是比较:
使用列表推导的现有解决方案:
最初的回答:
%%timeit
print([sum([y in vec for y in row]) for row in matr])
#Output
[3,2,1]
20 µs ± 1.9 µs per loop (mean ± std. dev. of 7 runs, 100000 loops each)

使用列表推导式中的集合交集提出的解决方案:

最初的回答

%%timeit
print([len(set(row).intersection(vec)) for row in matr])
#Output:
[3,2,1]
17.8 µs ± 1.46 µs per loop (mean ± std. dev. of 7 runs, 100000 loops each)

如果vec也是一个集合,我们将获得更好的效率:最初的回答。
%%timeit
vec = {'a', 'c', 'f', 'b'}
print([len(set(row).intersection(vec)) for row in matr])
#Output:
[3, 2, 1]
16.6 µs ± 1.99 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

如果矩阵和向量本来就是集合,那么这个解决方案比其他方案显著更有效。 - Luke DeLuccia
我可以将矩阵和向量调整为集合而不是列表;每行的所有值都是唯一的,向量也具有唯一的值。这很有前途。明天我会用一个更大的矩阵和向量进行基准测试,看看是否仍然成立。 - John Rouhana
我使用这种方法对比了我的原始方法,使用了一个更大的矩阵1000x2200和50个向量来运行基准测试。实际上,我没有看到在使用列表vec和集合vec之间有实质性的区别;通常每个向量的差异为0.0001秒,而集合vec始终更快。这种方法是对我原来的方法的重大改进;在上述矩阵大小下,我观察到了20-25倍的加速。尽管如此,其他一些方法的基准测试速度更快。 - John Rouhana
你也可以考虑加入线程或多进程来进行基准测试,从而对所有方法进行评估。这应该能够为您提供最佳的结果。 - amanb

1

这里有一个简单易懂的解决方案,使用np.isin()函数(docs):

np.sum(np.isin(matr, vec), axis=1)

作为额外奖励,如果您想要获取矩阵中哪些元素在向量中,只需使用np.isin()即可,无需求和。
>>> np.isin(matr, vec)
array([[ True,  True,  True, False, False],
       [ True, False, False,  True, False],
       [ True, False, False, False, False]])

这就是为什么沿着行求和会产生所需的输出结果。

这本来是我要尝试的东西,但事实证明系统管理员正在运行Numpy 1.11。这个版本发布于1.13之前。我打算明天去麻烦他更新一下,然后再与其他解决方案进行基准测试,看看我们的表现如何。 - John Rouhana
1
@JohnRouhana 啊,糟糕!如果你确实受限于特定numpy版本以下的解决方案,那么我建议你编辑原帖并添加一个标签来说明这个限制(为了未来的读者)。 - alkasm
我确实听到你的意见。虽然,我很高兴有了这个潜在的解决方案,并且我正在努力更新 numpy,以便我可以看看这个方法与其他方法相比如何运行。如果我无法对此进行基准测试,我将更新 OP。 - John Rouhana
1
我让它工作了;根据我的基准测试,这是第二个最佳建议的方法。 - John Rouhana

1
让我们看一下您当前算法的速度。根据Python维基百科,检查一个项是否在像y in vec这样的数组中是O(n)的,也就是说,最坏情况下,它必须遍历vec中的每个元素。由于您要为矩阵的每个元素进行该检查,因此您的总操作数为numRows * numCols * vecLen,即O(n^3)
更快的方法是为vec构建字典以优化查找,因为字典是O(1)而不是O(n),这意味着它们可以在1次操作中执行检查,无论vec有多长:
vecDict = dict([(x, 1) for x in vec])

因此,您的新时间复杂度为(numRows * numCols) + vecLen,这是O(n^2),我认为这对于您的数据来说已经是最快的了。

[sum([y in vecDict for y in row]) for row in matr]

1
这绝对是一个有价值的观点,而且这也是我在看到其他解决方案之前正在考虑的解决方案。虽然这比我最初的方法有了很大的改进(当使用一个1000x2200的矩阵时,我观察到了10-15倍的加速),但Divakar和amanb的方法似乎更快。 - John Rouhana
嘿,如果更快的话,就去做吧。我对Python的NumPy库不是很了解。可能是因为它们正在使用本机执行更多操作,所以它可能更加优化。 - frodo2975

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接