获取两个二维numpy数组中相交的行

55

我想要获取两个2D numpy数组中的交集(共同行)。例如,如果以下数组被作为输入传递:

array([[1, 4],
       [2, 5],
       [3, 6]])

array([[1, 4],
       [3, 6],
       [7, 8]])

输出应该是:

array([[1, 4],
       [3, 6])

我知道如何使用循环来完成这个任务,但我正在寻找一种Pythonic/Numpy的方法来完成。

11个回答

52

对于短数组,使用集合可能是最清晰和最易读的方法。

另一种方法是使用numpy.intersect1d。不过你需要把行当做单个值来处理...这会使事情变得稍微难以理解些...

import numpy as np

A = np.array([[1,4],[2,5],[3,6]])
B = np.array([[1,4],[3,6],[7,8]])

nrows, ncols = A.shape
dtype={'names':['f{}'.format(i) for i in range(ncols)],
       'formats':ncols * [A.dtype]}

C = np.intersect1d(A.view(dtype), B.view(dtype))

# This last bit is optional if you're okay with "C" being a structured array...
C = C.view(A.dtype).reshape(-1, ncols)

对于大型数组而言,这应该比使用集合要快得多。


2
np.intersect1d(a, b).reshape(-1, ncols) 能够达到相同的结果吗? - Rob Cowie
撤销之前的评论...你是完全正确的...它应该在所有情况下都能工作。 - Joe Kington
3
实际上不行(我之前意识到了,然后又忘了!)如果没有结构化的 dtype,它就不会将事物视为行,只是“原始”的数字。考虑一些像 A = np.array([[4,1],[2,5],[3,6]])B = np.array([[1,4],[3,6],[7,8]]) 这样的东西。 - Joe Kington
1
@Karthik - 你是否遇到了ValueError: zero length field name in format的错误?我使用了新式字符串格式化。在Python2.6上,你需要将'names':['f{}'.format...替换为'names':['f{0}'.format.... - Joe Kington
3
您可以使用以下代码替换 'dtype' 行:dtype = (', '.join([str(A.dtype)]*ncols))。由于名称未指定,因此默认为 f0、f1 等。 - Henry Schreiner
显示剩余3条评论

23
你可以使用Python的集合(set):
>>> import numpy as np
>>> A = np.array([[1,4],[2,5],[3,6]])
>>> B = np.array([[1,4],[3,6],[7,8]])
>>> aset = set([tuple(x) for x in A])
>>> bset = set([tuple(x) for x in B])
>>> np.array([x for x in aset & bset])
array([[1, 4],
       [3, 6]])

如Rob Cowie指出的那样,可以更简洁地完成此操作

np.array([x for x in set(tuple(x) for x in A) & set(tuple(x) for x in B)])

可能有一种方法可以在不使用数组到元组的来回转换的情况下完成这个任务,但现在我想不出来了。


2
我同意。找不到任何“本地”的numpy方法来完成它。一行版本可能是common = set(tuple(i) for i in A) & set(tuple(i) for i in B) - Rob Cowie
2
如果你要使用set,你可以使用交集函数:set.intersection(aset, bset)。 - rhinoinrepose
1
@rhinoinrepose - 使用函数比使用 & 更快吗? - mtrw

12
我不明白为什么没有建议使用纯numpy方法来解决这个问题。因此,我找到了一种方法,它使用了numpy广播。基本思想是通过轴交换将其中一个数组转换为3d。让我们构建2个数组:
a=np.random.randint(10, size=(5, 3))
b=np.zeros_like(a)
b[:4,:]=a[np.random.randint(a.shape[0], size=4), :]

通过我的运行,它给出了:

a=array([[5, 6, 3],
   [8, 1, 0],
   [2, 1, 4],
   [8, 0, 6],
   [6, 7, 6]])
b=array([[2, 1, 4],
   [2, 1, 4],
   [6, 7, 6],
   [5, 6, 3],
   [0, 0, 0]])

步骤如下(数组可以交换):
#a is nxm and b is kxm
c = np.swapaxes(a[:,:,None],1,2)==b #transform a to nx1xm
# c has nxkxm dimensions due to comparison broadcast
# each nxixj slice holds comparison matrix between a[j,:] and b[i,:]
# Decrease dimension to nxk with product:
c = np.prod(c,axis=2)
#To get around duplicates://
# Calculate cumulative sum in k-th dimension
c= c*np.cumsum(c,axis=0)
# compare with 1, so that to get only one 'True' statement by row
c=c==1
#//
# sum in k-th dimension, so that a nx1 vector is produced
c=np.sum(c,axis=1).astype(bool)
# The intersection between a and b is a[c]
result=a[c]

在一个用于减少内存使用的拥有2行代码的函数中(如果我说错了请纠正):

def array_row_intersection(a,b):
   tmp=np.prod(np.swapaxes(a[:,:,None],1,2)==b,axis=2)
   return a[np.sum(np.cumsum(tmp,axis=0)*tmp==1,axis=1).astype(bool)]

这是我的示例结果:

result=array([[5, 6, 3],
       [2, 1, 4],
       [6, 7, 6]])

这种方法比设置解决方案更快,因为它仅使用简单的numpy操作,同时不断减少维数,非常适用于两个大矩阵。我想我在评论中可能犯了错误,因为我是通过试验和直觉得出答案的。列交集的等效方法可以通过转置数组或稍微改变步骤来找到。此外,如果需要重复项,则必须跳过“//”内部的步骤。该函数可以编辑以仅返回索引的布尔数组,这对我很有用,因为我尝试使用相同的向量获取不同的数组索引。投票答案和我的基准测试(每个维度中的元素数量对选择什么起作用):
代码:
def voted_answer(A,B):
    nrows, ncols = A.shape
    dtype={'names':['f{}'.format(i) for i in range(ncols)],
           'formats':ncols * [A.dtype]}
    C = np.intersect1d(A.view(dtype), B.view(dtype))
    return C.view(A.dtype).reshape(-1, ncols)

a_small=np.random.randint(10, size=(10, 10))
b_small=np.zeros_like(a_small)
b_small=a_small[np.random.randint(a_small.shape[0],size=[a_small.shape[0]]),:]
a_big_row=np.random.randint(10, size=(10, 1000))
b_big_row=a_big_row[np.random.randint(a_big_row.shape[0],size=[a_big_row.shape[0]]),:]
a_big_col=np.random.randint(10, size=(1000, 10))
b_big_col=a_big_col[np.random.randint(a_big_col.shape[0],size=[a_big_col.shape[0]]),:]
a_big_all=np.random.randint(10, size=(100,100))
b_big_all=a_big_all[np.random.randint(a_big_all.shape[0],size=[a_big_all.shape[0]]),:]



print 'Small arrays:'
print '\t Voted answer:',timeit.timeit(lambda:voted_answer(a_small,b_small),number=100)/100
print '\t Proposed answer:',timeit.timeit(lambda:array_row_intersection(a_small,b_small),number=100)/100
print 'Big column arrays:'
print '\t Voted answer:',timeit.timeit(lambda:voted_answer(a_big_col,b_big_col),number=100)/100
print '\t Proposed answer:',timeit.timeit(lambda:array_row_intersection(a_big_col,b_big_col),number=100)/100
print 'Big row arrays:'
print '\t Voted answer:',timeit.timeit(lambda:voted_answer(a_big_row,b_big_row),number=100)/100
print '\t Proposed answer:',timeit.timeit(lambda:array_row_intersection(a_big_row,b_big_row),number=100)/100
print 'Big arrays:'
print '\t Voted answer:',timeit.timeit(lambda:voted_answer(a_big_all,b_big_all),number=100)/100
print '\t Proposed answer:',timeit.timeit(lambda:array_row_intersection(a_big_all,b_big_all),number=100)/100

带有结果:

Small arrays:
     Voted answer: 7.47108459473e-05
     Proposed answer: 2.47001647949e-05
Big column arrays:
     Voted answer: 0.00198730945587
     Proposed answer: 0.0560171294212
Big row arrays:
     Voted answer: 0.00500325918198
     Proposed answer: 0.000308241844177
Big arrays:
     Voted answer: 0.000864889621735
     Proposed answer: 0.00257176160812

结论是,如果您需要比较两个大的二维点数组,则使用被投票选为答案的方法。如果在所有维度上都有大矩阵,则被投票选为答案的方法无疑是最好的。因此,每次取决于您选择什么。


出现 ValueError: 'axis' 条目超出了 'c = np.prod(c,axis=2)' 行的范围。 - MajesticRa
很奇怪,c应该有3个维度,通过发出命令`c = np.swapaxes(a[:,:,None],1,2)==b' .... - Vasilis Lemonidis
非常整洁。通过进行小的修改,还可以返回索引。np.where(np.prod(np.swapaxes(Array_A[:,:,None],1,2) == Array_B,axis=2).astype(bool)) - bmg

11

Numpy 广播

我们可以使用广播创建布尔掩码,然后用该掩码过滤在数组 B 中也存在的数组 A 的行。

A = np.array([[1,4],[2,5],[3,6]])
B = np.array([[1,4],[3,6],[7,8]])

m = (A[:, None] == B).all(-1).any(1)

>>> A[m]

array([[1, 4],
       [3, 6]])

5
使用结构化数组实现这一点的另一种方法:
>>> a = np.array([[3, 1, 2], [5, 8, 9], [7, 4, 3]])
>>> b = np.array([[2, 3, 0], [3, 1, 2], [7, 4, 3]])
>>> av = a.view([('', a.dtype)] * a.shape[1]).ravel()
>>> bv = b.view([('', b.dtype)] * b.shape[1]).ravel()
>>> np.intersect1d(av, bv).view(a.dtype).reshape(-1, a.shape[1])
array([[3, 1, 2],
       [7, 4, 3]])

为了更清晰,结构化视图如下:

>>> a.view([('', a.dtype)] * a.shape[1])
array([[(3, 1, 2)],
       [(5, 8, 9)],
       [(7, 4, 3)]],
       dtype=[('f0', '<i8'), ('f1', '<i8'), ('f2', '<i8')])

2
np.array(set(map(tuple, b)).difference(set(map(tuple, a))))

这也可以起作用。

2
A = np.array([[1,4],[2,5],[3,6]])
B = np.array([[1,4],[3,6],[7,8]])

def matching_rows(A,B):
  matches=[i for i in range(B.shape[0]) if np.any(np.all(A==B[i],axis=1))]
  if len(matches)==0:
    return B[matches]
  return np.unique(B[matches],axis=0)

>>> matching_rows(A,B)
array([[1, 4],
       [3, 6]])


当然,这假设所有行的长度都相同。

2

没有索引 访问https://gist.github.com/RashidLadj/971c7235ce796836853fcf55b4876f3c

def intersect2D(Array_A, Array_B):
"""
Find row intersection between 2D numpy arrays, a and b.
"""

# ''' Using Tuple ''' #
intersectionList = list(set([tuple(x) for x in Array_A for y in Array_B  if(tuple(x) == tuple(y))]))
print ("intersectionList = \n",intersectionList)

# ''' Using Numpy function "array_equal" ''' #
""" This method is valid for an ndarray """
intersectionList = list(set([tuple(x) for x in Array_A for y in Array_B  if(np.array_equal(x, y))]))
print ("intersectionList = \n",intersectionList)

# ''' Using set and bitwise and '''
intersectionList = [list(y) for y in (set([tuple(x) for x in Array_A]) & set([tuple(x) for x in Array_B]))]
print ("intersectionList = \n",intersectionList)

return intersectionList

使用索引 访问https://gist.github.com/RashidLadj/bac71f3d3380064de2f9abe0ae43c19e

def intersect2D(Array_A, Array_B):
  """
  Find row intersection between 2D numpy arrays, a and b.
  Returns another numpy array with shared rows and index of items in A & B arrays
  """
  # [[IDX], [IDY], [value]] where Equal
  # ''' Using Tuple ''' #
  IndexEqual = np.asarray([(i, j, x) for i,x in enumerate(Array_A) for j, y in enumerate (Array_B)  if(tuple(x) == tuple(y))]).T
  
  # ''' Using Numpy array_equal ''' #
  IndexEqual = np.asarray([(i, j, x) for i,x in enumerate(Array_A) for j, y in enumerate (Array_B)  if(np.array_equal(x, y))]).T
  
  idx, idy, intersectionList = (IndexEqual[0], IndexEqual[1], IndexEqual[2]) if len(IndexEqual) != 0 else ([], [], [])

  return intersectionList, idx, idy

1
所有提到的方法在处理非常大的数组时都很慢。我找到了一种使用numpy的方法。由于numpy只提供1D数组的np.in1d函数,我们可以使用cantor pairing将2D数组编码为1D数组,然后使用numpy的函数。
def cantor_pairing(a, b):
    return (a + b) * (a + b + 1) / 2 + a

def intersecting_indices(a, b):

    pair_a = cantor_pairing(cantor_pairing(a[:,0], a[:, 1]), a[:, 2])
    pair_b = cantor_pairing(cantor_pairing(b[:,0], b[:, 1]), b[:, 2])

    boolean_array = np.in1d(pair_a, pair_b)
    intersected_indices = np.where(bool_array==True)[0]

    return intersected_indices

这是处理非常大的数组的最快方法。 需要记住的一件重要事情是,康托对偶只能用于非负整数。所以如果你的数组中有负整数,在使用之前将它们都转为正数(通过加上最小值)。

0
import numpy as np

A=np.array([[1, 4],
       [2, 5],
       [3, 6]])

B=np.array([[1, 4],
       [3, 6],
       [7, 8]])

intersetingRows=[(B==irow).all(axis=1).any() for irow in A]
print(A[intersetingRows])

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接