高效地对两个预排序的numpy数组进行argsort

4

我有几组数组。在每一组中,所有的数组都是一维数组,并且长度相同。在每一组中都有一个主要数组,它已经排序好了。

例如:

grp_1 = [
    np.array([10, 20, 30, 40]),
    np.array(["A", "C", "E", "G"]),
    ]

grp_2 = [
    np.array([15, 25, 35]),
    np.array(["Z", "Y", "X"]),
    ]

现在我想把我的组中对应的元素合并起来。我希望这样做可以使结果的主数组以稳定的方式进行排序。例如:

def combine_groups(groups):
    combined_arrays = [np.concatenate([grp[idx] for grp in groups]) for idx in range(len(groups[0]))]
    sort_indices = np.argsort(combined_arrays[0], kind="mergesort")
    # Merge sort rather than quicksort because the former is stable
    return [arr[sort_indices] for arr in combined_arrays]

这段代码是可行的,但对于比此示例大得多的数组来说,速度远不如其应有的快。归并排序的时间复杂度为 O(N log(N)),而合并已经排序的数组应该是一个 O(N) 的事情。
我发现了 cytoolz 包,它有一个 merge_sorted 包,当涉及到对我的主数组进行排序时,它要比 numpy 更加出色。不幸的是,我需要获取结果索引,以便我也可以转换非主数组。
因此:是否有比使用numpy的 argsort 更快的方法呢?
1个回答

2

简而言之

就像你正在做的那样,使用归并排序即可。先前类似问题的讨论基准测试都表明,如果不自己编写Cython代码(甚至可能即使这样也不行),你不可能超越你已经在使用的方法。

不使用归并排序的方法

只需将您的组合并,然后使用cytoolz.merge_sorted

from cytoolz import merge_sorted

# it will be an iterator that yields (10, 'A'), (15, 'Z'), (20, 'C'), (25, 'Y'), (30, 'E'), (35, 'X'), (40, 'G')
it = merge_sorted(zip(*grp_1), zip(*grp_2))

# unzip the tuples back into your desired arrays
grp_comb = [np.array(d) for d in zip(*it)]
print(grp_comb)

输出:

[array([10, 15, 20, 25, 30, 35, 40]), array(['A', 'Z', 'C', 'Y', 'E', 'X', 'G'], dtype='<U1')]

或者,如果您真的想通过类似于 numpy.argsort 的间接排序来合并您的组,您可以使用 ndarray.searchsorted:
ix = grp_1[0].searchsorted(grp_2[0])
grp_comb= [np.insert(grp_1[i], ix, grp_2[i]) for i in range(2)]
print(grp_comb)

输出:

[array([10, 15, 20, 25, 30, 35, 40]), array(['A', 'Z', 'C', 'Y', 'E', 'X', 'G'], dtype='<U1')]

测试/计时

我使用以下代码测试我的答案是否产生与您发布的combine_groups函数相同的输出,并对各种方法进行计时:

from cytoolz import merge_sorted
import numpy as np
from numpy.testing import assert_array_equal

grp_1 = [
    np.array([10, 20, 30, 40]),
    np.array(["A", "C", "E", "G"]),
    ]

grp_2 = [
    np.array([15, 25, 35]),
    np.array(["Z", "Y", "X"]),
    ]

def combine_groups(*groups):
    combined_arrays = [np.concatenate([grp[idx] for grp in groups]) for idx in range(len(groups[0]))]
    sort_indices = np.argsort(combined_arrays[0], kind="mergesort")
    # Merge sort rather than quicksort because the former is stable
    return [arr[sort_indices] for arr in combined_arrays]

def combine_groups_ms(*groups):
    it = merge_sorted(*(zip(*g) for g in groups))
    return [np.array(d) for d in zip(*it)]

def combine_groups_ssi(g0, g1):
    ix = g0[0].searchsorted(g1[0])
    return [np.insert(g0[i], ix, g1[i]) for i in range(len(g0))]

expected = combine_groups(grp_1, grp_2)
assert_array_equal(combine_groups_ms(grp_1, grp_2), expected)
assert_array_equal(combine_groups_ssi(grp_1, grp_2), expected)

这是时间安排:
%%timeit
combine_groups(grp_1, grp_2)
6.84 µs ± 154 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

%%timeit
combine_groups_ms(grp_1, grp_2)
10.4 µs ± 249 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

%%timeit
combine_groups_ssi(grp_1, grp_2)
36.3 µs ± 1.27 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

所以,你最初使用连接后跟归并排序的尝试实际上比我编写的直接利用预排序性能更快的代码要快。类似的问题已经在SO上提问过,并且产生了类似的基准测试结果。通过查看归并排序算法的细节,我认为这可能是因为合并两个已排序列表只需要一步就可以达到归并排序的最佳情况性能。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接