在numpy.array中找到唯一的行

257

我需要在一个 numpy.array 中找到独特的行。

例如:

>>> a # I have
array([[1, 1, 1, 0, 0, 0],
       [0, 1, 1, 1, 0, 0],
       [0, 1, 1, 1, 0, 0],
       [1, 1, 1, 0, 0, 0],
       [1, 1, 1, 1, 1, 0]])
>>> new_a # I want to get to
array([[1, 1, 1, 0, 0, 0],
       [0, 1, 1, 1, 0, 0],
       [1, 1, 1, 1, 1, 0]])
我知道可以创建一个集合并循环遍历数组,但我正在寻找一个高效的纯 numpy 解决方案。我相信有一种方法可以将数据类型设置为 void,然后我就可以只使用 numpy.unique,但我无法弄清楚如何使其工作。

15
pandas有一个名为dataframe.drop_duplicates()的方法。请参见https://dev59.com/B2ct5IYBdhLWcg3wIqHW和http://pandas.pydata.org/pandas-docs/dev/generated/pandas.DataFrame.drop_duplicates.html - codeape
谢谢,但我不能使用pandas。 - Akavall
2
可能是在numpy数组的每一行中删除重复项的重复问题。 - Andy Hayden
这个怎么样?https://dev59.com/cWoy5IYBdhLWcg3wa9ff#8567929? - codeape
1
@Andy Hayden,尽管标题如此,但它并不是这个问题的重复。codeape的链接是一个重复。 - Wai Yip Tung
5
这个功能将原生地添加到1.13版本中:https://github.com/numpy/numpy/pull/7742 - Eric
20个回答

197
从NumPy 1.13开始,可以在任何N维数组中简单选择轴以选择唯一值。要获取唯一行,请使用以下方式使用np.unique
unique_rows = np.unique(original_array, axis=0)

23
小心使用这个函数。np.unique(list_cor, axis=0)会得到一个_去除重复行的数组_,它并不会过滤掉_原始数组中不唯一的元素_。例如,请参见此处 - Brad Solomon
1
请注意,如果您想忽略行中值的顺序获取唯一的行,则可以首先按列直接对原始数组进行排序:original_array.sort(axis=1) - mangecoeur
我希望有Pandas drop_duplicates()的等效方法:它不会排序(而是使用高效的哈希算法)。通常情况下不需要排序,排序会增加额外的计算量。 - Pierre D

147
另一种可能的解决方案
np.vstack({tuple(row) for row in a})

编辑:正如其他人所提到的,从NumPy 1.16开始,这种方法已经被弃用。在现代版本中,你可以这样做。
np.vstack(tuple(set(map(tuple,a))))

在 Python 代码中,“map(tuple, a)”将矩阵a的每一行转换为元组,使其可哈希化。而“set(map(tuple, a))”将所有唯一的行创建为一个集合。由于集合是非序列可迭代对象,无法直接用于构建NumPy数组。通过外部调用“tuple”函数将集合转换为元组,解决了这个问题,使其可以用于创建数组。

22
这很清晰,简短且符合Python风格。除非速度是一个真正的问题,否则我认为这类解决方案应该优先于这个问题中较复杂、得票更高的答案。+1 - Bill Cheatham
3
太好了!花括号或set()函数能够解决问题。 - Tian He
3
@Greg von Winckel,你能否提供一些不会改变顺序的建议? - Laschet Jain
可以,但不能在单个命令中完成:x=[]; [x.append(tuple(r)) for r in a if tuple(r) not in x]; a_unique = array(x); - Greg von Winckel
3
为避免FutureWarning警告,请将集合转换为列表,例如:np.vstack(list({tuple(row) for row in AIPbiased[i, :, :]}))。注意:数组堆叠需要使用“序列”类型的数据,如列表或元组。不支持使用生成器等非序列可迭代对象,该功能已在NumPy 1.16版本中弃用,并将在未来引发错误。 - leermeester

115

使用结构化数组的另一个选项是使用一个 void 类型的视图,将整行合并为单个项目:

a = np.array([[1, 1, 1, 0, 0, 0],
              [0, 1, 1, 1, 0, 0],
              [0, 1, 1, 1, 0, 0],
              [1, 1, 1, 0, 0, 0],
              [1, 1, 1, 1, 1, 0]])

b = np.ascontiguousarray(a).view(np.dtype((np.void, a.dtype.itemsize * a.shape[1])))
_, idx = np.unique(b, return_index=True)

unique_a = a[idx]

>>> unique_a
array([[0, 1, 1, 1, 0, 0],
       [1, 1, 1, 0, 0, 0],
       [1, 1, 1, 1, 1, 0]])

编辑 根据@seberg的建议,添加了np.ascontiguousarray。如果数组不是连续的,则这将使该方法变慢。

编辑 通过执行以下操作,上述方法可以略微加快速度,也许会损失一些清晰度:

unique_a = np.unique(b).view(a.dtype).reshape(-1, a.shape[1])

在我的系统上,表现方面至少与lexsort方法相当甚至更好:

a = np.random.randint(2, size=(10000, 6))

%timeit np.unique(a.view(np.dtype((np.void, a.dtype.itemsize*a.shape[1])))).view(a.dtype).reshape(-1, a.shape[1])
100 loops, best of 3: 3.17 ms per loop

%timeit ind = np.lexsort(a.T); a[np.concatenate(([True],np.any(a[ind[1:]]!=a[ind[:-1]],axis=1)))]
100 loops, best of 3: 5.93 ms per loop

a = np.random.randint(2, size=(10000, 100))

%timeit np.unique(a.view(np.dtype((np.void, a.dtype.itemsize*a.shape[1])))).view(a.dtype).reshape(-1, a.shape[1])
10 loops, best of 3: 29.9 ms per loop

%timeit ind = np.lexsort(a.T); a[np.concatenate(([True],np.any(a[ind[1:]]!=a[ind[:-1]],axis=1)))]
10 loops, best of 3: 116 ms per loop

3
非常感谢。这就是我寻找的答案,能否解释一下这一步骤的操作:b = a.view(np.dtype((np.void, a.dtype.itemsize * a.shape[1])))?该代码段中的语句将多维数组a转换为一维数组b。具体来说,它使用NumPy的view方法和void数据类型将a中的每个元素作为字节串(byte string)来表示,并将结果存储在b中。这个过程通常被称为数组扁平化。 - Akavall
3
它创建了一个数据视图,其中使用np.void数据类型,其大小与完整行中的字节数相同。这类似于如果您有一个由np.uint8数组组成,并将其视为np.uint16,这会将每两列合并为一列,但更加灵活。 - Jaime
3
@Jaime,请您添加一个np.ascontiguousarray或类似的方法,以确保代码的安全性(我知道这可能比必要的限制多一点,但是...)。为了保证视图正常工作,行必须是连续的。 - seberg
2
@ConstantineEvans 这是最近的更新:在numpy 1.6中,尝试对np.void数组运行np.unique会返回一个与mergesort未实现该类型有关的错误。但在1.7中可以正常工作。 - Jaime
11
值得注意的是,如果对浮点数使用此方法,则有一个要注意的问题,即“-0.”将不等于“+0.”,而逐个比较元素会将“-0.==+0.”(如IEEE浮点标准所规定)。请参见https://dev59.com/118d5IYBdhLWcg3weiF-。 - tom10
显示剩余10条评论

32
如果你想避免将数组转换为元组序列或其他类似的数据结构所带来的内存开销,你可以利用NumPy的结构化数组。
关键是将原始数组视为结构化数组,其中每个条目对应于原始数组的一行。 这不会复制数组,并且非常高效。
以下是一个快速示例:
import numpy as np

data = np.array([[1, 1, 1, 0, 0, 0],
                 [0, 1, 1, 1, 0, 0],
                 [0, 1, 1, 1, 0, 0],
                 [1, 1, 1, 0, 0, 0],
                 [1, 1, 1, 1, 1, 0]])

ncols = data.shape[1]
dtype = data.dtype.descr * ncols
struct = data.view(dtype)

uniq = np.unique(struct)
uniq = uniq.view(data.dtype).reshape(-1, ncols)
print uniq

为了理解正在发生的事情,请查看中间结果。

一旦我们把事物视为一个结构化数组,数组中的每个元素都是原始数组中的一行。(基本上,它是类似于元组列表的数据结构。)

In [71]: struct
Out[71]:
array([[(1, 1, 1, 0, 0, 0)],
       [(0, 1, 1, 1, 0, 0)],
       [(0, 1, 1, 1, 0, 0)],
       [(1, 1, 1, 0, 0, 0)],
       [(1, 1, 1, 1, 1, 0)]],
      dtype=[('f0', '<i8'), ('f1', '<i8'), ('f2', '<i8'), ('f3', '<i8'), ('f4', '<i8'), ('f5', '<i8')])

In [72]: struct[0]
Out[72]:
array([(1, 1, 1, 0, 0, 0)],
      dtype=[('f0', '<i8'), ('f1', '<i8'), ('f2', '<i8'), ('f3', '<i8'), ('f4', '<i8'), ('f5', '<i8')])

运行numpy.unique后,我们将获得一个结构化数组:

In [73]: np.unique(struct)
Out[73]:
array([(0, 1, 1, 1, 0, 0), (1, 1, 1, 0, 0, 0), (1, 1, 1, 1, 1, 0)],
      dtype=[('f0', '<i8'), ('f1', '<i8'), ('f2', '<i8'), ('f3', '<i8'), ('f4', '<i8'), ('f5', '<i8')])

我们需要将其视为一个“普通”数组(_ 存储了在 ipython 中最后一次计算的结果,这就是为什么你会看到 _.view...):

In [74]: _.view(data.dtype)
Out[74]: array([0, 1, 1, 1, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 0])

然后将其重新塑造为2D数组(-1是一个占位符,告诉numpy计算正确的行数,给出列数):

In [75]: _.reshape(-1, ncols)
Out[75]:
array([[0, 1, 1, 1, 0, 0],
       [1, 1, 1, 0, 0, 0],
       [1, 1, 1, 1, 1, 0]])
显然,如果你想更简洁地表达,可以写成:
import numpy as np

def unique_rows(data):
    uniq = np.unique(data.view(data.dtype.descr * data.shape[1]))
    return uniq.view(data.dtype).reshape(-1, data.shape[1])

data = np.array([[1, 1, 1, 0, 0, 0],
                 [0, 1, 1, 1, 0, 0],
                 [0, 1, 1, 1, 0, 0],
                 [1, 1, 1, 0, 0, 0],
                 [1, 1, 1, 1, 1, 0]])
print unique_rows(data)

导致结果如下:

[[0 1 1 1 0 0]
 [1 1 1 0 0 0]
 [1 1 1 1 1 0]]

这似乎非常慢,几乎和使用元组一样慢。像这样排序结构化数组是很慢的,显然。 - cge
3
请尝试使用更大尺寸的数组。是的,对于排序numpy数组而言,比起列表来说速度会较慢。然而,在大多数使用ndarrays的情况下,速度并不是最主要的考虑因素,而是内存使用情况。使用元组列表将比此解决方案占用 大量 的内存。即使您有足够的内存,对于一个相当大的数组来说,将其转换为元组列表要比速度优势带来的额外开销更大。 - Joe Kington
@cge - 啊,我没注意到你在使用lexsort。我以为你是在提到使用元组列表。是的,在这种情况下,lexsort可能是更好的选择。我忘记了它,而是采用了过于复杂的解决方案。 - Joe Kington

20

np.unique运行于np.random.random(100).reshape(10,10),返回所有独特的单个元素,但您想要唯一的行,因此首先需要将它们放入元组中:

array = #your numpy array of lists
new_array = [tuple(row) for row in array]
uniques = np.unique(new_array)

那是我唯一认为你能够改变类型以实现所需目标的方式,但我不确定将列表迭代更改为元组是否符合你“不要遍历”的要求。


5
这个代码简洁明了,符合 Python 风格。在速度不是实际问题的情况下,我认为这种类型的解决方案比那些复杂且得票更高的答案更好。 - Bill Cheatham
我更喜欢这个解决方案而不是被接受的解决方案。速度对我来说不是问题,因为每次调用可能只有不到100行。这准确描述了如何执行行唯一性。 - rayryeng
6
对于我的数据,这个实际上不起作用,"uniques"包含唯一的元素。可能我误解了“array”的预期形状 - 你能更精确地说明吗? - FooBar
@ryan-saxe 我喜欢这是Pythonic的,但这不是一个好的解决方案,因为返回到“uniques”的行是排序的(因此与“array”中的行不同)。`B = np.array([[1,2],[2,1]]);A = np.unique([tuple(row) for row in B]);print(A) = array([[1, 2],[1, 2]])` - jmlarson

19

np.unique通过对一个扁平化的数组进行排序,然后查看每个项目是否等于前一个来实现。这也可以在不扁平化的情况下手动完成:

ind = np.lexsort(a.T)
a[ind[np.concatenate(([True],np.any(a[ind[1:]]!=a[ind[:-1]],axis=1)))]]

这种方法不使用元组,应该比这里给出的其他方法更快、更简单。

注意:先前的版本没有在a[之后加上ind,这意味着使用了错误的索引。此外,Joe Kington 指出,这确实会产生各种中间副本。下面的方法通过制作排序副本,然后使用其视图来减少副本数量:

b = a[np.lexsort(a.T)]
b[np.concatenate(([True], np.any(b[1:] != b[:-1],axis=1)))]

这样做更快且占用较少内存。

另外,如果您想在 ndarray 中查找唯一行,而不管数组中有多少维,则可以使用以下方法:

b = a[lexsort(a.reshape((a.shape[0],-1)).T)];
b[np.concatenate(([True], np.any(b[1:]!=b[:-1],axis=tuple(range(1,a.ndim)))))]

一个有趣的问题是,如果你想要对任意维度数组中的任意轴进行排序/去重,这将会更加困难。

编辑:

为了展示速度差异,我在ipython中运行了三种不同方法的几个测试。对于 您的 确切情况,差别不大,尽管此版本稍微快一些:

In [87]: %timeit unique(a.view(dtype)).view('<i8')
10000 loops, best of 3: 48.4 us per loop

In [88]: %timeit ind = np.lexsort(a.T); a[np.concatenate(([True], np.any(a[ind[1:]]!= a[ind[:-1]], axis=1)))]
10000 loops, best of 3: 37.6 us per loop

In [89]: %timeit b = [tuple(row) for row in a]; np.unique(b)
10000 loops, best of 3: 41.6 us per loop

然而,使用更大的a值,这个版本会变得快得多,快得多。

In [96]: a = np.random.randint(0,2,size=(10000,6))

In [97]: %timeit unique(a.view(dtype)).view('<i8')
10 loops, best of 3: 24.4 ms per loop

In [98]: %timeit b = [tuple(row) for row in a]; np.unique(b)
10 loops, best of 3: 28.2 ms per loop

In [99]: %timeit ind = np.lexsort(a.T); a[np.concatenate(([True],np.any(a[ind[1:]]!= a[ind[:-1]],axis=1)))]
100 loops, best of 3: 3.25 ms per loop

非常好!顺便说一下,它确实会生成几个中间副本(例如a[ind[1:]]是一个副本等),但另一方面,你的解决方案通常比我的快2-3倍,直到你的内存用完为止。 - Joe Kington
好的观点。事实证明,仅使用索引来取出中间副本的尝试使我的方法使用更多的内存,并且最终比只是制作排序后的数组副本更慢,因为a_sorted [1:]不是a_sorted的副本。 - cge
您的时间测量中的dtype是什么?我认为你搞错了。在我的系统上,按照我的答案所述调用np.unique比使用您两种np.lexsort中的任何一种都要稍快。如果要查找唯一数组的形状为(10000, 100),它大约快5倍。即使您决定重新实现np.unique以减少一些(次要)执行时间,将每行折叠成单个对象也比必须在列的比较上调用np.any更快,特别是对于更高的列数。 - Jaime
@cge:您可能是想用 'np.any' 而不是标准的 'any',后者不接受关键字参数。 - M. Toya
@Jaime - 我认为 dtype 只是 a.dtype,即正在查看的数据的数据类型,就像 Joe Kington 在他的回答中所做的那样。如果有许多列,则使用 lexsort 保持快速的另一种(不完美!)方法是仅对几列进行排序。这是特定于数据的,因为需要知道哪些列提供足够的差异以完美地排序。例如,a.shape = (60000, 500) - 在前3列上排序:ind = np.lexsort((a[:, 2], a[:, 1], a[:, 0]))。时间节省相当可观,但免责声明再次提醒:它可能无法捕获所有情况 - 这取决于数据。 - n1k31t4

12

我已经比较了建议的替代方法的速度,并惊讶地发现,空视图unique解决方案甚至比numpy的本地unique带有axis参数更快。如果你想要速度,你会选择

numpy.unique(
    a.view(numpy.dtype((numpy.void, a.dtype.itemsize*a.shape[1])))
).view(a.dtype).reshape(-1, a.shape[1])

我已经在 npx.unique_rows 中实现了最快的变体。

这也有一个在 GitHub 上的错误报告

输入图像描述


重现绘图的代码:

import numpy
import perfplot


def unique_void_view(a):
    return (
        numpy.unique(a.view(numpy.dtype((numpy.void, a.dtype.itemsize * a.shape[1]))))
        .view(a.dtype)
        .reshape(-1, a.shape[1])
    )


def lexsort(a):
    ind = numpy.lexsort(a.T)
    return a[
        ind[numpy.concatenate(([True], numpy.any(a[ind[1:]] != a[ind[:-1]], axis=1)))]
    ]


def vstack(a):
    return numpy.vstack([tuple(row) for row in a])


def unique_axis(a):
    return numpy.unique(a, axis=0)


perfplot.show(
    setup=lambda n: numpy.random.randint(2, size=(n, 20)),
    kernels=[unique_void_view, lexsort, vstack, unique_axis],
    n_range=[2 ** k for k in range(15)],
    xlabel="len(a)",
    equality_check=None,
)

1
非常好的答案,只有一个小点:vstack_dict 从未使用过字典,花括号是集合生成式,因此它的行为几乎与 vstack_set 相同。由于 vstack_dict 的性能线在图表中缺失,看起来它只是被 vstack_set 性能图所覆盖,因为它们非常相似! - Akavall
谢谢回复。我已经改进了绘图,只包括一个 vstack 变量。 - Nico Schlömer

9
这是@Greg提供的Pythonic答案的另一种变体。
np.vstack(set(map(tuple, a)))

8
我不喜欢这些答案中的任何一个,因为它们都不能以线性代数或向量空间的方式处理浮点数组,其中两行“相等”的意思是“在某个容差范围内”。唯一有容差阈值的答案 https://dev59.com/mmQn5IYBdhLWcg3wRVRs#26867764,将容差阈值同时应用于元素和小数精度,这适用于某些情况,但不如真正的向量距离在数学上更加通用。
这是我的版本:
from scipy.spatial.distance import squareform, pdist

def uniqueRows(arr, thresh=0.0, metric='euclidean'):
    "Returns subset of rows that are unique, in terms of Euclidean distance"
    distances = squareform(pdist(arr, metric=metric))
    idxset = {tuple(np.nonzero(v)[0]) for v in distances <= thresh}
    return arr[[x[0] for x in idxset]]

# With this, unique columns are super-easy:
def uniqueColumns(arr, *args, **kwargs):
    return uniqueRows(arr.T, *args, **kwargs)

上面的公共函数使用 scipy.spatial.distance.pdist 来找到每对行之间的欧几里得(可定制)距离。然后它将每个距离与 threshold 进行比较,以找到彼此相距不超过 thresh 的行,并仅返回每个 thresh-cluster 中的一行。
正如提示的那样,距离度量标准不必是欧几里得距离——pdist 可以计算各种距离,包括 cityblock(曼哈顿范数)和 cosine(向量之间的夹角)。
如果 thresh=0(默认值),则必须完全相同才被视为“唯一”。其他好的 thresh 值使用缩放的机器精度,即 thresh=np.spacing(1)*1e3

最佳答案。谢谢。这是到目前为止最(数学上)概括的答案。它将矩阵视为N维空间中的数据点或样本集,并找到一组相同或相似的点(相似性由欧几里德距离或任何其他方法定义)。这些点可以是重叠的数据点或非常接近的邻域。最后,一组相同或相似的点被同一集合中的任何一个点(在上面的答案中是第一个点)替换。这有助于从点云中减少冗余。 - Sanchit
@Sanchit 嗯,这是个好点子,函数可以让用户指定如何选择每个 thresh 大小邻域的代表点,而不是选择“第一个”点(实际上,它可能是有效的随机点,因为它取决于 Python 如何在 set 中存储点),例如使用“中位数”或最接近质心的点等。 - Ahmed Fasih
当然。毫无疑问。我只是提到了第一个点,因为这正是你的程序正在做的事情,完全没有问题。 - Sanchit
只是一个更正 - 我之前错误地说,由于set的无序性,每个thresh簇将选择哪一行是随机的。当然,这是我的一个脑抽,set存储了在thresh邻域中的索引元组,因此对于每个thresh簇,findRows实际上返回其中的第一行。 - Ahmed Fasih

4

为什么不使用pandas的drop_duplicates函数:

>>> timeit pd.DataFrame(image.reshape(-1,3)).drop_duplicates().values
1 loops, best of 3: 3.08 s per loop

>>> timeit np.vstack({tuple(r) for r in image.reshape(-1,3)})
1 loops, best of 3: 51 s per loop

我真的很喜欢这个答案。当然,它并没有直接使用numpy,但对我来说,它是最容易理解且速度最快的一个。 - noctilux

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接