两个多维数组之间的交集(带有容差)- NumPy/Python

3

我遇到了一个问题。我有两个2-D的numpy数组,填充了x和y坐标。这些数组可能看起来像:

array1([[(1.22, 5.64)],
   [(2.31, 7.63)],
   [(4.94, 4.15)]],

array2([[(1.23, 5.63)],
   [(6.31, 10.63)],
   [(2.32, 7.65)]],

现在我需要找到“重复节点”。然而,我还必须考虑坐标在给定公差范围内的节点相等,因此,我不能使用像这个的解决方案。由于我的数组非常大(每个数组约为200,000行),两个简单的for循环也不是一个选项。我的最终输出应该如下所示:

output([[(1.23, 5.63)],
   [(2.32, 7.65)]],

我需要一些提示。

谢谢,


1
一定要尝试使用pandas库。它专为大数据集而设计,内置了交集函数。 - Yserbius
1
也许你可以通过四舍五入小数 np.around(array1, 1) 或向上取整值 np.ceil(array1) 来近似你的结果。 - J. Doe
你尝试过这些解决方案中的任何一个吗? - Divakar
首先,很抱歉回复晚了,也感谢提供的所有有用的方法。不幸的是,在不修改我的初始问题的情况下,我无法使用任何一个建议。有些建议耗时太长,而另一些则占用内存过多。尽管如此,我将所有已尝试并基本解决所提出问题的答案都标记为有用。 - user6270168
@SebastianG 那么,你最终是如何解决你的问题的呢? 你是否找到了比所有列出的解决方案都更好的东西? 如果是这样,能否分享一下? - Divakar
5个回答

4
为了比较两个节点,我建议使用numpy.isclose()函数,您可以设置相对和绝对公差。
numpy.isclose(1.24, 1.25, atol=1e-1)
# [True]
numpy.isclose([1.24, 2.31], [1.25, 2.32], atol=1e-1)
# [True, True]

不必使用两个for循环,您可以利用itertools.product()包,遍历所有对。以下代码实现了您想要的功能:

array1 = np.array([[1.22, 5.64],
                   [2.31, 7.63],
                   [4.94, 4.15]])

array2 = np.array([[1.23, 5.63],
                   [6.31, 10.63],
                   [2.32, 7.64]])

output = np.empty((0,2))
for i0, i1 in itertools.product(np.arange(array1.shape[0]),
                                np.arange(array2.shape[0])):
    if np.all(np.isclose(array1[i0], array2[i1], atol=1e-1)):
         output = np.concatenate((output, [array2[i1]]), axis=0)
# output = [[ 1.23  5.63]
#           [ 2.32  7.64]]

2

定义一个类似于numpy.iscloseisclose函数,但速度更快(主要是因为不检查任何输入并且不支持相对和绝对容差):

import numpy as np

array1 = np.array([[(1.22, 5.64)],
                   [(2.31, 7.63)],
                   [(4.94, 4.15)]])

array2 = np.array([[(1.23, 5.63)],
                    [(6.31, 10.63)],
                    [(2.32, 7.65)]])

def isclose(x, y, atol):
    return np.abs(x - y) < atol

现在来到了难点。我们需要计算内部维度中是否有任何两个值是接近的。为此,我会重新塑造数组,使第一个数组的值沿着第二个维度复制到第一维和第二维上,而第二个数组的值沿着第一维复制到第二维上(注意1, 33, 1):

In [92]: isclose(array1.reshape(1,3,2), array2.reshape(3,1,2), 0.03)
Out[92]: 
array([[[ True,  True],
        [False, False],
        [False, False]],

       [[False, False],
        [False, False],
        [False, False]],

       [[False, False],
        [ True,  True],
        [False, False]]], dtype=bool)

现在我们想要所有数值接近其他数值的条目(在同一维度上):

In [93]: isclose(array1.reshape(1,3,2), array2.reshape(3,1,2), 0.03).any(axis=0)
Out[93]: 
array([[ True,  True],
       [ True,  True],
       [False, False]], dtype=bool)

然后我们只想要那些元组中两个值都接近的:

In [111]: isclose(array1.reshape(1,3,2), array2.reshape(3,1,2), 0.03).any(axis=0).all(axis=-1)
Out[111]: array([ True,  True, False], dtype=bool)

最后,我们可以使用这个来索引array1:
In [112]: array1[isclose(array1.reshape(1,3,2), array2.reshape(3,1,2), 0.03).any(axis=0).all(axis=-1)]
Out[112]: 
array([[[ 1.22,  5.64]],

       [[ 2.31,  7.63]]])

如果您想的话,可以交换anyall的调用。在您的情况下,其中一个可能比另一个更快。 reshape调用中的3需要替换为您数据的实际长度。
该算法将具有与使用itertools.product的其他答案相同的糟糕运行时,但至少实际循环是由numpy隐式执行的并且是用C实现的。这在计时中可见。
In [122]: %timeit array1[isclose(array1.reshape(1,len(array1),2), array2.reshape(len(array2),1,2), 0.03).any(axis=0).all(axis=-1)]
11.6 µs ± 493 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

In [126]: %timeit pares(array1_pares, array2_pares)
267 µs ± 8.72 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

函数pares是由@Ferran Parés另一个答案中定义的代码,数组已经被重塑。

对于更大的数组,这变得更加明显:

array1 = np.random.normal(0, 0.1, size=(1000, 1, 2))
array2 = np.random.normal(0, 0.1, size=(1000, 1, 2))

array1_pares = array1.reshape(1000, 2)
array2_pares = arra2.reshape(1000, 2)

In [149]: %timeit array1[isclose(array1.reshape(1,len(array1),2), array2.reshape(len(array2),1,2), 0.03).any(axis=0).all(axis=-1)]
135 µs ± 5.34 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

In [157]: %timeit pares(array1_pares, array2_pares)
1min 36s ± 6.85 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

最终,这受可用系统内存的限制。我的机器(16GB RAM)仍然可以处理长度为20000的数组,但这几乎将其推到了100%。这也需要大约12秒:

In [14]: array1 = np.random.normal(0, 0.1, size=(20000, 1, 2))
In [15]: array2 = np.random.normal(0, 0.1, size=(20000, 1, 2))
In [16]: %timeit array1[isclose(array1.reshape(1,len(array1),2), array2.reshape(len(array2),1,2), 0.03).any(axis=0).all(axis=-1)]
12.3 s ± 514 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

1
那么,对于20万个点,我们需要1600GB的RAM,是吗?看来我们需要等待未来的到来。 - Divakar
@Divakar 好吧,要么就使用更好的算法(比如你的答案中提到的)。 - Graipher

2

有很多可能的方法来定义这个公差。由于我们谈论的是XY坐标,最可能是指欧几里得距离来设置该公差值。因此,我们可以使用Cython-powered kd-tree进行快速最近邻查找,这既在内存方面又在性能方面非常高效。实现代码应该类似于这样 -

from scipy.spatial import cKDTree

# Assuming a default tolerance value of 1 here
def intersect_close(a, b, tol=1):
    # Get closest distances for each pt in b
    dist = cKDTree(a).query(b, k=1)[0] # k=1 selects closest one neighbor

    # Check the distances against the given tolerance value and 
    # thus filter out rows off b for the final output
    return b[dist <= tol]

示例逐步运行 -

# Input 2D arrays
In [68]: a
Out[68]: 
array([[1.22, 5.64],
       [2.31, 7.63],
       [4.94, 4.15]])

In [69]: b
Out[69]: 
array([[ 1.23,  5.63],
       [ 6.31, 10.63],
       [ 2.32,  7.65]])

# Get closest distances for each pt in b
In [70]: dist = cKDTree(a).query(b, k=1)[0]

In [71]: dist
Out[71]: array([0.01414214, 5.        , 0.02236068])

# Mask of distances within the given tolerance
In [72]: tol = 1

In [73]: dist <= tol
Out[73]: array([ True, False,  True])

# Finally filter out valid ones off b
In [74]: b[dist <= tol]
Out[74]: 
array([[1.23, 5.63],
       [2.32, 7.65]])

20万个点的计时 -

In [20]: N = 200000
    ...: np.random.seed(0)
    ...: a = np.random.rand(N,2)
    ...: b = np.random.rand(N,2)

In [21]: %timeit intersect_close(a, b)
1 loop, best of 3: 1.37 s per loop

0
如评论所述,对您的数字进行缩放和舍入可能允许您使用intersect1d或等效方法。
如果您只有两列,将其转换为具有复合数据类型的1d数组可能有效。
但是,您可能还需要记住intersect1d的功能:
if not assume_unique:
    # Might be faster than unique( intersect1d( ar1, ar2 ) )?
    ar1 = unique(ar1)
    ar2 = unique(ar2)
aux = np.concatenate((ar1, ar2))
aux.sort()
return aux[:-1][aux[1:] == aux[:-1]]

unique 已经增强了处理行(axis 参数),但是 intersect 没有。无论如何,它使用 argsort 将相似的元素放在一起,然后跳过重复项。

请注意,intersect 连接唯一数组,排序,然后再次查找重复项。

我知道您不想要循环版本,但为了促进问题的概念化,这里还是有一个:

In [581]: a = np.array([(1.22, 5.64),
     ...:    (2.31, 7.63),
     ...:    (4.94, 4.15)])
     ...: 
     ...: b = np.array([(1.23, 5.63),
     ...:    (6.31, 10.63),
     ...:    (2.32, 7.65)])
     ...:    

我已经移除了你的数组中的一层嵌套。

In [582]: c = []
In [583]: for a1 in a:
     ...:     for b1 in b:
     ...:         if np.allclose(a1,b1, atol=0.5): c.append((a1,b1))

或者使用列表推导式

In [586]: [(a1,b1) for a1 in a for b1 in b if np.allclose(a1,b1,atol=0.5)]
Out[586]: 
[(array([1.22, 5.64]), array([1.23, 5.63])),
 (array([2.31, 7.63]), array([2.32, 7.65]))]

复杂逼近

In [604]: aa = (a*10).astype(int)
In [605]: aa
Out[605]: 
array([[12, 56],
       [23, 76],
       [49, 41]])
In [606]: ac=aa[:,0]+1j*aa[:,1]
In [607]: bb = (b*10).astype(int)
In [608]: bc=bb[:,0]+1j*bb[:,1]
In [609]: np.intersect1d(ac,bc)
Out[609]: array([12.+56.j, 23.+76.j])

灵感相交

连接数组,排序,取差集,并找到小的差异:

In [616]: ab = np.concatenate((a,b),axis=0)
In [618]: np.lexsort(ab.T)
Out[618]: array([2, 3, 0, 1, 5, 4], dtype=int32)
In [619]: ab1 = ab[_,:]
In [620]: ab1
Out[620]: 
array([[ 4.94,  4.15],
       [ 1.23,  5.63],
       [ 1.22,  5.64],
       [ 2.31,  7.63],
       [ 2.32,  7.65],
       [ 6.31, 10.63]])
In [621]: ab1[1:]-ab1[:-1]
Out[621]: 
array([[-3.71,  1.48],
       [-0.01,  0.01],
       [ 1.09,  1.99],
       [ 0.01,  0.02],
       [ 3.99,  2.98]])

In [623]: ((ab1[1:]-ab1[:-1])<.1).all(axis=1)  # refine with abs
Out[623]: array([False,  True, False,  True, False])
In [626]: np.where(Out[623])
Out[626]: (array([1, 3], dtype=int32),)
In [627]: ab[_]
Out[627]: 
array([[2.31, 7.63],
       [1.23, 5.63]])

0

也许你可以尝试使用纯NP和自定义函数:

import numpy as np
#Your Example
xDA=np.array([[1.22, 5.64],[2.31, 7.63],[4.94, 4.15],[6.1,6.2]])
yDA=np.array([[1.23, 5.63],[6.31, 10.63],[2.32, 7.65],[3.1,9.2]])
###Try this large sample###
#xDA=np.round(np.random.uniform(1,2, size=(5000, 2)),2)
#yDA=np.round(np.random.uniform(1,2, size=(5000, 2)),2)

print(xDA)
print(yDA)

#Match x to y
def np_matrix(myx,myy,calp=0.2):
    Xxx = np.transpose(np.repeat(myx[:, np.newaxis], myy.size, axis=1))
    Yyy = np.repeat(myy[:, np.newaxis], myx.size, axis=1)

    # define a caliper
    matches = {}
    dist = np.abs(Xxx - Yyy)
    for m in range(0, myx.size):
        if (np.min(dist[:, m]) <= calp) or not calp:
            matches[m] = np.argmin(dist[:, m])
    return matches


alwd_dist=0.1

xc1=xDA[:,1]
yc1=yDA[:,1]
m1=np_matrix(xc1,yc1,alwd_dist)
xc0=xDA[:,0]
yc0=yDA[:,0]
m0=np_matrix(xc0,yc0,alwd_dist)

shared_items = set(m1.items()) & set(m0.items())
if (int(len(shared_items))==0):
    print("No Matched Items based on given allowed distance:",alwd_dist)
else:
    print("Matched:")
    for ke in shared_items:
        print(xDA[ke[0]],yDA[ke[1]])

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接