在数组中找到最接近的点 - KDTree的反函数

4

我有一个非常大的ndarray A,以及一个已排序的点列表k(一个很小的列表,大约30个点)。

对于A的每个元素,我想确定在点列表k中最接近的元素,以及其索引。因此类似于:

>>> A = np.asarray([3, 4, 5, 6])
>>> k = np.asarray([4.1, 3])
>>> values, indices
[3, 4.1, 4.1, 4.1], [1, 0, 0, 0]

现在的问题是A非常非常大,所以我不能做一些低效的事情,比如给A添加一维,将其与k取绝对差值,然后取每列的最小值。
目前我一直在使用np.searchsorted,如此处第二个答案所示:在numpy数组中查找最近的值 但是即使这样也太慢了。这是我使用的代码(修改后可用于多个值):
def find_nearest(A,k):

    indicesClosest = np.searchsorted(k, A)
    flagToReduce = indicesClosest==k.shape[0]
    modifiedIndicesToAvoidOutOfBoundsException = indicesClosest.copy()
    modifiedIndicesToAvoidOutOfBoundsException[flagToReduce] -= 1
    flagToReduce = np.logical_or(flagToReduce,
                     np.abs(A-k[indicesClosest-1]) <
                     np.abs(A - k[modifiedIndicesToAvoidOutOfBoundsException]))
    flagToReduce = np.logical_and(indicesClosest > 0, flagToReduce)
    indicesClosest[flagToReduce] -= 1
    valuesClosest = k[indicesClosest]
    return valuesClosest, indicesClosest

我随后想到使用scipy.spatial.KDTree:

>>> d = scipy.spatial.KDTree(k)
>>> d.query(A)

这比searchsorted的解决方案要慢得多。
另一方面,数组A始终保持不变,只有k会改变。因此,在A上使用一些辅助结构(如“反向KDTree”)并在小数组k上查询结果将是有益的。
有类似的东西吗?
编辑
目前我正在使用np.searchsorted的一个变体,需要对数组A进行排序。我们可以提前进行这个预处理步骤,但在计算索引后仍然必须恢复原始顺序。这个变体的速度大约是上面那个的两倍。
A = np.random.random(3000000)
k = np.random.random(30)

indices_sort = np.argsort(A)
sortedA = A[indices_sort]

inv_indices_sort = np.argsort(indices_sort)
k.sort()


def find_nearest(sortedA, k):
    midpoints = k[:-1] + np.diff(k)/2
    idx_aux = np.searchsorted(sortedA, midpoints)
    idx = []
    count = 0
    final_indices = np.zeros(sortedA.shape, dtype=int)
    old_obj = None
    for obj in idx_aux:
        if obj != old_obj:
            idx.append((obj, count))
            old_obj = obj
        count += 1
    old_idx = 0
    for idx_A, idx_k in idx:
        final_indices[old_idx:idx_A] = idx_k
        old_idx = idx_A
    final_indices[old_idx:] = len(k)-1

    indicesClosest = final_indices[inv_indices_sort] #<- this takes 90% of the time
    return k[indicesClosest], indicesClosest

这条耗费大量时间的代码行是将索引恢复到它们原来的顺序。

你有多个value。那么,在使用searchsorted时,你是在循环吗?展示一下你的searchsorted尝试?或者你使用了这段代码-https://dev59.com/_3E85IYBdhLWcg3w3Xr6#26026189/? - Divakar
请具体说明比“非常非常大”更多的细节。请给出A的典型大小。 - Warren Weckesser
@Divakar 是的,我用了那段代码 :) 我会进行编辑。 - Ant
不要认为这是你的尝试,因为它不能处理k中的多个值。 - Divakar
我认为没有内置的东西。我的意思是在最近的k表中进行数组查找,其中该表以一些离散的bin大小制表。只有当表中的条目数远少于A中的值时,这才有效。大概只适用于低维度。 - xioxox
显示剩余18条评论
2个回答

2

更新:

内置函数numpy.digitize实际上可以完美地完成你需要的任务。只需要一个小技巧: digitize将值分配给bins(箱子)。我们可以通过对数组进行排序并将边界设置在相邻元素的正中间来将k转换为箱子。

import numpy as np

A = np.asarray([3, 4, 5, 6])
k = np.asarray([4.1, 3, 1])  # added another value to show that sorting/binning works

ki = np.argsort(k)
ks = k[ki]

i = np.digitize(A, (ks[:-1] + ks[1:]) / 2)

indices = ki[i]
values = ks[i]

print(values, indices)
# [ 3.   4.1  4.1  4.1] [1 0 0 0]

新答案:

针对每个在k中的元素,我会采用暴力方法,在A上进行一次向量化遍历,并更新当前元素可以改进近似值的位置。

import numpy as np

A = np.asarray([3, 4, 5, 6])
k = np.asarray([4.1, 3])

err = np.zeros_like(A) + np.inf  # keep track of error over passes

values = np.empty_like(A, dtype=k.dtype)
indices = np.empty_like(A, dtype=int)

for i, v in enumerate(k):
    d = np.abs(A - v)
    mask = d < err  # only update where v is closer to A
    values[mask] = v
    indices[mask] = i
    err[mask] = d[mask]

print(values, indices)
# [ 3.   4.1  4.1  4.1] [1 0 0 0]

这种方法需要三个与A大小相同的临时变量,如果内存不足则会失败。


谢谢你的回答!暴力解决方案太慢了。np.digitize是个好主意,但我认为它和np.searchsorted没有什么区别,对吧?我们的接口略有不同,但速度应该差不多。最可能改进的方法就是利用矩阵A永远不会改变,只有k会改变这一事实;因此,以某种方式预处理A,并将其转换为更容易执行必要计算的格式。 - Ant
@Ant,我认为你是正确的。我不熟悉 searchsorted,所以这种相似性在我身上有些被忽视了。然而,尝试使用 digitize 也许是值得的。有时候,非常相似的NumPy函数在性能上会出现令人惊讶的差异。 - MB-F

2

因此,在一些工作和来自scipy邮件列表的想法之后,我认为在我这种情况下(具有恒定的A和缓慢变化的k),实现最好的方法是使用以下实现。

class SearchSorted:
    def __init__(self, tensor, use_k_optimization=True):

        '''
        use_k_optimization requires storing 4x the size of the tensor.
        If use_k_optimization is True, the class will assume that successive calls will be made with similar k.
        When this happens, we can cut the running time significantly by storing additional variables. If it won't be
        called with successive k, set the flag to False, as otherwise would just consume more memory for no
        good reason
        '''

        self.indices_sort = np.argsort(tensor)
        self.sorted_tensor = tensor[self.indices_sort]
        self.inv_indices_sort = np.argsort(self.indices_sort)
        self.use_k_optimization = use_k_optimization

        self.previous_indices_results = None
        self.prev_idx_A_k_pair = None

    def query(self, k):
        midpoints = k[:-1] + np.diff(k) / 2
        idx_count = np.searchsorted(self.sorted_tensor, midpoints)
        idx_A_k_pair = []
        count = 0

        old_obj = 0
        for obj in idx_count:
            if obj != old_obj:
                idx_A_k_pair.append((obj, count))
                old_obj = obj
            count += 1

        if not self.use_k_optimization or self.previous_indices_results is None:
            #creates the index matrix in the sorted case
            final_indices = self._create_indices_matrix(idx_A_k_pair, self.sorted_tensor.shape, len(k))
            #and now unsort it to match the original tensor position
            indicesClosest = final_indices[self.inv_indices_sort]
            if self.use_k_optimization:
                self.prev_idx_A_k_pair = idx_A_k_pair
                self.previous_indices_results = indicesClosest
            return indicesClosest

        old_indices_unsorted = self._create_indices_matrix(self.prev_idx_A_k_pair, self.sorted_tensor.shape, len(k))
        new_indices_unsorted = self._create_indices_matrix(idx_A_k_pair, self.sorted_tensor.shape, len(k))
        mask = new_indices_unsorted != old_indices_unsorted

        self.prev_idx_A_k_pair = idx_A_k_pair
        self.previous_indices_results[self.indices_sort[mask]] = new_indices_unsorted[mask]
        indicesClosest = self.previous_indices_results

        return indicesClosest

    @staticmethod
    def _create_indices_matrix(idx_A_k_pair, matrix_shape, len_quant_points):
        old_idx = 0
        final_indices = np.zeros(matrix_shape, dtype=int)
        for idx_A, idx_k in idx_A_k_pair:
            final_indices[old_idx:idx_A] = idx_k
            old_idx = idx_A
        final_indices[old_idx:] = len_quant_points - 1
        return final_indices

这个方法是先对数组A进行排序,然后在k的中点上使用A的searchsorted函数。这样做可以得到与之前完全相同的信息,即告诉我们A中哪些点更接近k中的哪些点。_create_indices_matrix方法将从这些信息创建完整的索引数组,然后我们将对其进行排序以恢复A的原始顺序。为了利用缓慢变化的k,我们保存最后的索引,并确定需要更改哪些索引;然后只更改那些索引。对于缓慢变化的k,这样做可以产生卓越的性能(但内存成本会大得多)。

对于包含500万个元素的随机矩阵A和大约30个元素的k,重复60次实验,我们得到:

Function search_sorted1; 15.72285795211792s
Function search_sorted2; 13.030786037445068s
Function query; 2.3306031227111816s <- the one with use_k_optimization = True
Function query; 4.81286096572876s   <- with use_k_optimization = False

scipy.spatial.KDTree.query速度太慢了,我没有计时(超过1分钟)。这是用于计时的代码;还包括search_sorted1和2的实现。

import numpy as np
import scipy
import scipy.spatial
import time


A = np.random.rand(10000*500) #5 million elements
k = np.random.rand(32)
k.sort()

#first attempt, detailed in the answer, too
def search_sorted1(A, k):
    indicesClosest = np.searchsorted(k, A)
    flagToReduce = indicesClosest == k.shape[0]
    modifiedIndicesToAvoidOutOfBoundsException = indicesClosest.copy()
    modifiedIndicesToAvoidOutOfBoundsException[flagToReduce] -= 1

    flagToReduce = np.logical_or(flagToReduce,
                        np.abs(A-k[indicesClosest-1]) <
                        np.abs(A - k[modifiedIndicesToAvoidOutOfBoundsException]))
    flagToReduce = np.logical_and(indicesClosest > 0, flagToReduce)
    indicesClosest[flagToReduce] -= 1

    return indicesClosest

#taken from @Divakar answer linked in the comments under the question
def search_sorted2(A, k):
    indicesClosest = np.searchsorted(k, A, side="left").clip(max=k.size - 1)
    mask = (indicesClosest > 0) & \
           ((indicesClosest == len(k)) | (np.fabs(A - k[indicesClosest - 1]) < np.fabs(A - k[indicesClosest])))
    indicesClosest = indicesClosest - mask

    return indicesClosest
def kdquery1(A, k):
    d = scipy.spatial.cKDTree(k, compact_nodes=False, balanced_tree=False)
    _, indices = d.query(A)
    return indices

#After an indea on scipy mailing list
class SearchSorted:
    def __init__(self, tensor, use_k_optimization=True):

        '''
        Using this requires storing 4x the size of the tensor.
        If use_k_optimization is True, the class will assume that successive calls will be made with similar k.
        When this happens, we can cut the running time significantly by storing additional variables. If it won't be
        called with successive k, set the flag to False, as otherwise would just consume more memory for no
        good reason
        '''

        self.indices_sort = np.argsort(tensor)
        self.sorted_tensor = tensor[self.indices_sort]
        self.inv_indices_sort = np.argsort(self.indices_sort)
        self.use_k_optimization = use_k_optimization

        self.previous_indices_results = None
        self.prev_idx_A_k_pair = None

    def query(self, k):
        midpoints = k[:-1] + np.diff(k) / 2
        idx_count = np.searchsorted(self.sorted_tensor, midpoints)
        idx_A_k_pair = []
        count = 0

        old_obj = 0
        for obj in idx_count:
            if obj != old_obj:
                idx_A_k_pair.append((obj, count))
                old_obj = obj
            count += 1

        if not self.use_k_optimization or self.previous_indices_results is None:
            #creates the index matrix in the sorted case
            final_indices = self._create_indices_matrix(idx_A_k_pair, self.sorted_tensor.shape, len(k))
            #and now unsort it to match the original tensor position
            indicesClosest = final_indices[self.inv_indices_sort]
            if self.use_k_optimization:
                self.prev_idx_A_k_pair = idx_A_k_pair
                self.previous_indices_results = indicesClosest
            return indicesClosest

        old_indices_unsorted = self._create_indices_matrix(self.prev_idx_A_k_pair, self.sorted_tensor.shape, len(k))
        new_indices_unsorted = self._create_indices_matrix(idx_A_k_pair, self.sorted_tensor.shape, len(k))
        mask = new_indices_unsorted != old_indices_unsorted

        self.prev_idx_A_k_pair = idx_A_k_pair
        self.previous_indices_results[self.indices_sort[mask]] = new_indices_unsorted[mask]
        indicesClosest = self.previous_indices_results

        return indicesClosest

    @staticmethod
    def _create_indices_matrix(idx_A_k_pair, matrix_shape, len_quant_points):
        old_idx = 0
        final_indices = np.zeros(matrix_shape, dtype=int)
        for idx_A, idx_k in idx_A_k_pair:
            final_indices[old_idx:idx_A] = idx_k
            old_idx = idx_A
        final_indices[old_idx:] = len_quant_points - 1
        return final_indices

mySearchSorted = SearchSorted(A, use_k_optimization=True)
mySearchSorted2 = SearchSorted(A, use_k_optimization=False)
allFunctions = [search_sorted1, search_sorted2,
                mySearchSorted.query,
                mySearchSorted2.query]

print(np.array_equal(mySearchSorted.query(k), kdquery1(A, k)[1]))
print(np.array_equal(mySearchSorted.query(k), search_sorted2(A, k)[1]))
print(np.array_equal(mySearchSorted2.query(k), search_sorted2(A, k)[1]))

if __name__== '__main__':
    num_to_average = 3
    for func in allFunctions:
        if func.__name__ == 'search_sorted3':
            indices_sort = np.argsort(A)
            sA = A[indices_sort].copy()
            inv_indices_sort = np.argsort(indices_sort)
        else:
            sA = A.copy()
        if func.__name__ != 'query':
            func_to_use = lambda x: func(sA, x)
        else:
            func_to_use = func
        k_to_use = k
        start_time = time.time()
        for idx_average in range(num_to_average):
            for idx_repeat in range(10):
                k_to_use += (2*np.random.rand(*k.shape)-1)/100 #uniform between (-1/100, 1/100)
                k_to_use.sort()
                indices = func_to_use(k_to_use)
                if func.__name__ == 'search_sorted3':
                    indices = indices[inv_indices_sort]
                val = k[indices]

        end_time = time.time()
        total_time = end_time-start_time

        print('Function {}; {}s'.format(func.__name__, total_time))

我相信我们仍然可以做得更好(我在SerchSorted类中使用了很多空间,所以我们可能可以节省一些)。如果您有任何改进的想法,请让我知道!


你可以通过为 previous_indices_results 指定 int8 来减轻内存压力。在我看来,使用 self.previous_indices_results[mask] \ = new_indices_unsorted[self.inv_indices_sort[mask]] 可以增强可读性,并且 indices_sort 不再需要成为实例属性。(2*np.random.rand(*k.shape)-1)/100 是一个不错的选择,…/20 并不能提高速度,而 …/500 也没有太大改进。 - greybeard
int8是否足以索引k,考虑到整个向量有超过256个元素?我希望如此:例如k(约30个点)可以用"uint5"索引。我曾经认为这是错误的-有趣的是,我不得不说服自己在左侧使用"正向置换"是正确的。并且等同于在右侧使用"逆置换"。 - greybeard
我不得不说服自己,在左侧使用“前向置换”是正确的,但我仍然对两者感到同样不安。 - greybeard
另外两个想法:在“查询”之间保留索引,并在每个值从“旧索引”开始的A中进行线性搜索。如果没有旧索引,请使用“完全展开”/“硬编码”的二分搜索。(如果不能保证k小于64个条目,则转换为常规二分搜索。) - greybeard
很多[...]想法一旦尝试过后结果不确定(天气邀请尝试雨衣)。在解释的Python中进行线性搜索结果太慢了,无法提及;为了快速JIT PoP,我转换到Java - 比“查询false”仅有3倍的改进。尝试PyPy(3)或将“query”转换为Java/Jython以获得额外的“性能点”可能值得一试。 - greybeard
显示剩余5条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接