将排序的项目搜索到已排序序列中

3

我希望能在一个排序后的数值数组中找到一系列项目。 我知道可以使用numpy进行以下操作:

l = np.searchsorted(values, items)

这个算法的复杂度为 O(len(items)*log(len(values)))。但是,我的数据也是有序的,所以我可以用以下方法将其处理为 O(len(items)+len(values)) :

l = np.zeros(items.size, dtype=np.int32)
k, K = 0, len(values)
for i in range(len(items)):
    while k < K and values[k] < items[i]:
        k += 1
    l[i] = k

问题在于这个纯Python版本由于循环的原因比searchsorted慢得多,即使对于大的len(items)和len(values)(~10^6)。

有没有想法用numpy“向量化”这个循环?


2
你不能在每个项目中使用searchsorted,同时从上次找到的索引处切片值直到结尾吗?也许这样可以加快速度。 - PeterE
如果在values中items[i]不存在,你的方法会正确地工作吗?我认为它会指示第一个大于items[i]的值。 - Marek
2
即使你将其向量化,你仍然会停留在Python领域...而NumPy会让你大吃一惊。 - Joran Beasley
1
@JoranBeasley,“向量化”是指使用NumPy原语。 - rndblnch
1
@PeterE所建议的方法已经内置于numpy 1.9中,这个是相关的PR。 - Jaime
显示剩余3条评论
1个回答

2

一些示例数据:

# some example data
np.random.seed(0)
n_values = 1000000
n_items = 100000
values = np.random.rand(n_values)
items = np.random.rand(n_items)
values.sort()
items.sort()

您的原始代码片段以及@PeterE建议的实现:

def original(values, items):
    l = np.empty(items.size, dtype=np.int32)
    k, K = 0, len(values)
    for i, item in enumerate(items):
        while k < K and values[k] < item:
            k += 1
        l[i] = k
    return l

def peter_e(values, items):
    l = np.empty(items.size, dtype=np.int32)
    last_idx = 0
    for i, item in enumerate(items):
        last_idx += values[last_idx:].searchsorted(item)
        l[i] = last_idx
    return l

对比使用简单粗暴的np.searchsorted进行正确性测试:

ss = values.searchsorted(items)

print(all(original(values, items) == ss))
# True

print(all(peter_e(values, items) == ss))
# True

时间:

In [1]: %timeit original(values, items)
10 loops, best of 3: 115 ms per loop

In [2]: %timeit peter_e(values, items)
10 loops, best of 3: 79.8 ms per loop

In [3]: %timeit values.searchsorted(items)
100 loops, best of 3: 4.09 ms per loop

对于这样大小的输入,简单使用np.searchsorted比您原来的代码和PeterE的建议更加方便。

更新

为了避免可能会影响计时的缓存效应,我们可以为基准测试的每个迭代生成一组新的随机输入数组:

In [1]: %%timeit values = np.random.randn(n_values); items = np.random.randn(n_items); values.sort(); items.sort();
original(values, items)
   .....: 
10 loops, best of 3: 115 ms per loop

In [2]: %%timeit values = np.random.randn(n_values); items = np.random.randn(n_items); values.sort(); items.sort();
peter_e(values, items)
   .....: 
10 loops, best of 3: 79.9 ms per loop

In [3]: %%timeit values = np.random.randn(n_values); items = np.random.randn(n_items); values.sort(); items.sort();
values.searchsorted(items)
   .....: 
100 loops, best of 3: 4.08 ms per loop

更新2

编写一个Cython函数,用于在valuesitems都已排序的情况下击败np.searchsorted并不难。

search_doubly_sorted.pyx:

import numpy as np
cimport numpy as np
cimport cython

@cython.boundscheck(False)
@cython.wraparound(False)
def search_doubly_sorted(values, items):

    cdef:
        double[:] _values = values.astype(np.double)
        double[:] _items = items.astype(np.double)
        long n_items = items.shape[0]
        long n_values = values.shape[0]
        long[:] out = np.empty(n_items, dtype=np.int64)
        long ii, jj, last_idx

    last_idx = 0
    for ii in range(n_items):
        for jj in range(last_idx, n_values):
             if _items[ii] <= _values[jj]:
                break
        last_idx = jj
        out[ii] = last_idx

    return out.base

测试正确性:

In [1]: from search_doubly_sorted import search_doubly_sorted

In [2]: print(all(search_doubly_sorted(values, items) == values.searchsorted(items)))                     
# True

基准测试:

In [3]: %timeit values.searchsorted(items)
100 loops, best of 3: 4.07 ms per loop

In [4]: %timeit search_doubly_sorted(values, items)
1000 loops, best of 3: 1.44 ms per loop

性能提升相对较小。除非这是您代码中的严重瓶颈,否则您应该坚持使用np.searchsorted


另外,您能否将其与仅使用 values.searchsort(items) 进行计时(和可能验证)? - PeterE
感谢这些时间数据,但我认为要看到显著的差异,数组的大小应该真正更大(然后np.searchsorted基准测试才会非常快)。我的问题实际上是Python开销(隐藏在大O的常数因子中)非常大。 - rndblnch
@rndblnch,现在已经更新了更大输入的时间。np.searchsorted仍然比其他两种方法更快。 - ali_m
是的,我得到了类似的结果,但我担心警告可能会影响计时。 - PeterE
1
我已经进行了性能分析,发现瓶颈在这里。由于我正在处理非常大的数组,因此大O复杂度不仅是理论上的问题。使用Cython优化后的版本速度确实更快。 - rndblnch
显示剩余6条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接