对子数组进行向量化的numpy.unique()函数

5

我有一个形状为(N, 20, 20)的numpy数组数据,其中N是一个非常大的数字。 我想得到每个20x20子数组中唯一值的数量。使用循环可以实现:

values = []
for i in data:
    values.append(len(np.unique(i)))

如何向量化这个循环?速度是一个问题。

如果我尝试 np.unique(data),我会得到整个数据数组的唯一值,而不是每个20x20块的唯一值,所以那并不是我需要的。


你是否考虑过编写一个Fortran函数,并使用f2py进行包装?在Fortran子程序中,你可以很容易地并行化OpenMP。当我需要加速计算密集型循环时,我经常采用这种方法。 - deepak
1
另一种方法可能是使用 numba http://numba.pydata.org。它有一个向量化装饰器,我认为可能适用于这种情况。我不是 numba 的专家,所以您可能想看一下它。 - deepak
谢谢Deepak。我不会Fortran,如果必须使用另一种语言,我宁愿尝试使用Cython。我可能会探索如何使用Numba。 - martinako
1个回答

3

首先,您可以使用data.reshape(N,-1)来处理数据,因为您对最后2个维度进行排序感兴趣。

获取每行唯一值数量的简单方法是将每行转储到一个集合中并让它进行排序:

[len(set(i)) for i in data.reshape(data.shape[0],-1)]

但这只是一个迭代,可能是快速的。

“向量化”的一个问题是每行中唯一值的集合或列表长度不同。当涉及到“向量化”时,“长度不同的行”是一个警告信号。您不再拥有使大多数向量化变得可能的“矩形”数据布局。

您可以对每行进行排序:

np.sort(data.reshape(N,-1))

array([[1, 2, 2, 3, 3, 5, 5, 5, 6, 6],
       [1, 1, 1, 2, 2, 2, 3, 3, 5, 7],
       [0, 0, 2, 3, 4, 4, 4, 5, 5, 9],
       [2, 2, 3, 3, 4, 4, 5, 7, 8, 9],
       [0, 2, 2, 2, 2, 5, 5, 5, 7, 9]])

但是如何在不遍历的情况下识别每行中的唯一值? 计算非零差异的数量可能会起到作用:

In [530]: data=np.random.randint(10,size=(5,10))

In [531]: [len(set(i)) for i in data.reshape(data.shape[0],-1)]
Out[531]: [7, 6, 6, 8, 6]

In [532]: sdata=np.sort(data,axis=1)
In [533]: (np.diff(sdata)>0).sum(axis=1)+1            
Out[533]: array([7, 6, 6, 8, 6])

我本来想加一个关于浮点数的警告,但是如果您的数据可以使用np.unique,那么我的方法应该同样有效。


[(np.bincount(i)>0).sum() for i in data]

这是一种迭代解决方案,明显比我使用的 len(set(i)) 版本更快,并且与 diff...sort 相当竞争。

In [585]: data.shape Out[585]: (10000, 400)

In [586]: timeit [(np.bincount(i)>0).sum() for i in data]
1 loops, best of 3: 248 ms per loop

In [587]: %%timeit                                       
sdata=np.sort(data,axis=1)
(np.diff(sdata)>0).sum(axis=1)+1
   .....: 
1 loops, best of 3: 280 ms per loop

我刚发现了一种更快的使用bincountnp.count_nonzero的方法。

In [715]: timeit np.array([np.count_nonzero(np.bincount(i)) for i in data])
10 loops, best of 3: 59.6 ms per loop

我对速度的提升感到惊讶。但随后我想起来,count_nonzero 也在其他函数(例如 np.nonzero)中用于为返回结果分配空间。因此,这个函数被编写为最大限度地提高速度是有道理的。(它不能帮助 diff...sort 的情况,因为它不接受轴参数。)


谢谢。它确实起作用了!虽然我希望它能更快地运行。在我的数据上,它需要大约7秒钟,而我希望它可以在不到1秒钟内完成。如果没有其他更快的方法的答案,我会接受你的答案。 - martinako
“向量化”真的就是指“最快”吗? :) np.sort 大约需要 3/4 的时间;而 diff 部分只需要 1/4。按行排序所需的时间与对整个扁平数组进行排序的时间相同。 - hpaulj
我找到了一个稍微快一点的bincount版本 - 但它是按行迭代的。 - hpaulj
我尝试了bincount版本,但当我将其适应到我的脚本时,实际上比sort/diff (~7s)慢(约11秒),我需要将结果放置在numpy数组的特定部分,我不确定这是否需要很多时间。无论如何,我认为sort/diff版本纯Python可能已经是最快的了。所以我接受这个答案。我将尝试使用cython,因为我需要它在不到一秒的时间内工作。 - martinako
1
我发现count_nonzero可以大大加快bincount的解决方案。 - hpaulj
使用count_nonzero和bitcount解决方案,时间从(11s)缩短到(7s),与diff/sort解决方案的时间相似。 - martinako

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接