如何从一个序列中找到距离另一个序列最近的邻居索引

4
我有一个目标数组A,代表了NCEP再分析数据中等压面的压力水平。 我还有一个云被观察到的压力长时间序列B。
我正在寻找一个k最近邻查找算法,返回最近邻居的索引,类似于Matlab中的knnsearch。在Python中可以表示为: indices, distance = knnsearch(A, B, n),其中indices是B中每个值的前n个最近的A中的索引,distance是值B与最近的A值之间的距离。数组A和B可以具有不同的长度(这是目前大多数解决方案的瓶颈,因为我必须循环遍历B中的每个值来返回我的indices和distance)。
import numpy as np

A = np.array([1000, 925, 850, 700, 600, 500, 400, 300, 250, 200, 150, 100, 70, 50, 30, 20, 10]) # this is a fixed 17-by-1 array
B = np.array([923, 584.2, 605.3, 153.2]) # this can be any n-by-1 array
n = 2

我希望从indices, distance = knnsearch(A, B, n)得到以下结果:

indices = [[1, 2],[4, 5] etc...] 

A 中的 923 匹配到第一个 A[1]=925,然后匹配到 A[2]=850; 将 A 中的 584.2 匹配到第一个 A[4]=600,然后匹配到 A[5]=500

distance = [[72, 77],[15.8, 84.2] etc...]

其中,72 表示从 B 中查询的值到最近的 A 值之间的距离,例如:distance[0, 0] == np.abs(B[0] - A[1])

我想到的唯一解决方案是:

import numpy as np


def knnsearch(A, B, n):
    indices = np.zeros((len(B), n))
    distances = np.zeros((len(B), n))

    for i in range(len(B)):
        a = A
        for N in range(n):
            dif = np.abs(a - B[i])
            ind = np.argmin(dif)

            indices[i, N] = ind + N
            distances[i, N] = dif[ind + N]
            # remove this neighbour from from future consideration
            np.delete(a, ind)

    return indices, distances


array_A = np.array([1000, 925, 850, 700, 600, 500, 400, 300, 250, 200, 150, 100, 70, 50, 30, 20, 10])
array_B = np.array([923, 584.2, 605.3, 153.2])
neighbours = 2

indices, distances = knnsearch(array_A, array_B, neighbours)

print(indices)
print(distances)

返回:

[[ 1.  2.]
 [ 4.  5.]
 [ 4.  3.]
 [10. 11.]]

[[  2.   73. ]
 [ 15.8  84.2]
 [  5.3  94.7]
 [  3.2  53.2]]

如果我的A和B数组包含许多元素并且有许多最近的邻居,必须有一种方法可以消除for循环,以提高性能...

请帮忙!谢谢 :)


605.3的最近邻居是600和500而不是600和700,这是预期行为吗? - OriolAbril
@xg.plt.py 啊!不好意思,那是我的错。在提出问题的过程中,我有点解决了原来的问题。那些输出是手动输入的,受到我的错误的影响... 我会编辑以修复。你发现得真好! - JBright
1个回答

2
第二个循环可以轻易地进行向量化。最简单的方法是使用np.argsort 并选择对应于n个最小dif值的索引。然而,对于大型数组来说,由于只有n个值需要排序,因此最好使用np.argpartition
因此,代码应该像这样:
def vector_knnsearch(A, B, n):
    indices = np.empty((len(B), n))
    distances = np.empty((len(B), n))

    for i,b in enumerate(B):
        dif = np.abs(A - b)
        min_ind = np.argpartition(dif,n)[:n] # Returns the indexes of the 3 smallest
                                             # numbers but not necessarily sorted
        ind = min_ind[np.argsort(dif[min_ind])] # sort output of argpartition just in case
        indices[i, :] = ind
        distances[i, :] = dif[ind]

    return indices, distances

如评论中所述,第一个循环也可以使用meshgrid删除,但是构造meshgrid所需的额外内存和计算时间使得该方法在我尝试的维度上较慢(对于大数组,这可能会变得更糟,并导致内存错误)。此外,代码可读性降低。总体而言,这种方法可能不太符合Pythonic。

def mesh_knnsearch(A, B, n):
    m = len(B)
    rng = np.arange(m).reshape((m,1))
    Amesh, Bmesh = np.meshgrid(A,B)
    dif = np.abs(Amesh-Bmesh)
    min_ind = np.argpartition(dif,n,axis=1)[:,:n]
    ind = min_ind[rng,np.argsort(dif[rng,min_ind],axis=1)]

    return ind, dif[rng,ind]

不是必须将这个rng定义为二维数组才能检索a[rng[0],ind[0]]a[rng[1],ind[1]]等,并保持数组的维度,而是a[:,ind]检索a[:,ind[0]]a[:,ind[1]]等。


这很好。感谢 @xg.plt.py 的提示。我有一个问题,涉及到需要手动指定knnsearch的需求。 我正在使用xarray,并一直在使用数据数组的“mode ='nearest'”作为.sel功能。我想到,在幕后进行的过程是相同的,因为在我将“indices”指定到“.isel”中或让xarray在“isel”中使用“nearest”模式时,返回结果所需的时间大致相同...继续 - JBright
解决我在matlab中矢量化问题的一种方法是创建一个AB的网格,这样我就会有一个单独的向量,其中包含所有组合的A[i]B[i]。然后这将删除for i, b in enumerate(B),但通过A*len(B)增加新网格变量的内存。有没有Pythonic的方法来实现这个想法?感谢您的时间 :) - JBright

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接