Python中创建严格递增列表的最快方法

16

我希望找到在Python中实现以下操作的最高效方式:

假设我们有两个列表 ab,它们长度相等,最多包含1e7个元素。 但是,出于说明方便,我们可以考虑以下情况:

a = [2, 1, 2, 3, 4, 5, 4, 6, 5, 7, 8, 9, 8,10,11]
b = [1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15]

目标是从a创建一个严格单调的列表a_new,其中仅使用具有相同值的样本点的第一个样本点。

必须删除a中要删除的相同索引也应在b中删除,以便最终结果将是:

a_new = [2, 3, 4, 5, 6, 7, 8, 9,10,11]
b_new = [1, 4, 5, 6, 8,10,11,12,14,15]

当然,这可以通过使用计算成本高昂的for循环来完成,但由于数据量巨大,这种方法并不适合。

非常感谢任何建议。


1
非常感谢所有的贡献,它们都比传统的for循环方法快得多。我点赞了所有的解决方案,因为我真的很喜欢它们,而且它们都能够正常工作。@piRSquared的解决方案最终被证明是最快的。因此,我将接受他的答案。 - Rickson
5个回答

14
你可以计算 a 的累计最大值,并使用 np.unique 去除重复值,同时还可以记录唯一索引以便按照相应方式对 b 进行子集操作。
a = np.array([2,1,2,3,4,5,4,6,5,7,8,9,8,10,11])
b = np.array([1,2,3,4,5,6,7,8,9,10,11,12,13,14,15])

a_cummax = np.maximum.accumulate(a)    
a_new, idx = np.unique(a_cummax, return_index=True)

a_new
# array([ 2,  3,  4,  5,  6,  7,  8,  9, 10, 11])

b[idx]
# array([ 1,  4,  5,  6,  8, 10, 11, 12, 14, 15])

11

使用 @juanpa.arrivillaga 的函数版本,并加入 numba 运行。

import numba

def psi(A):
    a_cummax = np.maximum.accumulate(A)
    a_new, idx = np.unique(a_cummax, return_index=True)
    return idx

def foo(arr):
    aux=np.maximum.accumulate(arr)
    flag = np.concatenate(([True], aux[1:] != aux[:-1]))
    return np.nonzero(flag)[0]

@numba.jit
def f(A):
    m = A[0]
    a_new, idx = [m], [0]
    for i, a in enumerate(A[1:], 1):
        if a > m:
            m = a
            a_new.append(a)
            idx.append(i)
    return idx

时间安排

%timeit f(a)
The slowest run took 5.37 times longer than the fastest. This could mean that an intermediate result is being cached.
1000000 loops, best of 3: 1.83 µs per loop

%timeit foo(a)
The slowest run took 9.41 times longer than the fastest. This could mean that an intermediate result is being cached.
100000 loops, best of 3: 6.35 µs per loop

%timeit psi(a)
The slowest run took 9.66 times longer than the fastest. This could mean that an intermediate result is being cached.
100000 loops, best of 3: 9.95 µs per loop

4
你能将时间结果以文本形式发布,而不是图片形式吗?你的答案看起来很有意思,但我很难读懂结果。 - Cornstalks
@Cornstalks 我现在正在飞机上,无法操作。然而,numba的结果为1.8微秒,hpauljs foo的结果为6.3微秒,psidoms的结果为9.9微秒。 - piRSquared
另外请注意,这并没有像我的原始解决方案那样利用单次遍历。A[1:]几乎复制了整个列表。您应该使用for i, a in enumerate(A, i)进行迭代,并使用float('-inf')技巧。 - juanpa.arrivillaga
@Cornstalks:点击该图像以查看更合理的大小。 - Ben Voigt
你能提供更大数组的时间吗?每当 %timeit 打印出警告时,你不应该(完全)相信那个时间。 - MSeifert
显示剩余2条评论

10

以下是一份原生 Python 代码,只需一次遍历即可完成:

>>> a = [2,1,2,3,4,5,4,6,5,7,8,9,8,10,11]
>>> b = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]
>>> a_new, b_new = [], []
>>> last = float('-inf')
>>> for x, y in zip(a, b):
...     if x > last:
...         last = x
...         a_new.append(x)
...         b_new.append(y)
...
>>> a_new
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
>>> b_new
[1, 4, 5, 6, 8, 10, 11, 12, 14, 15]

我很好奇它与numpy解决方案的比较情况,它们将具有类似的时间复杂度,但会对数据进行几次传递。

这里是一些时间记录。 首先,设置:

>>> small = ([2,1,2,3,4,5,4,6,5,7,8,9,8,10,11], [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15])
>>> medium = (np.random.randint(1, 10000, (10000,)), np.random.randint(1, 10000, (10000,)))
>>> large = (np.random.randint(1, 10000000, (10000000,)), np.random.randint(1, 10000000, (10000000,)))

现在来介绍这两种方法:

>>> def monotonic(a, b):
...     a_new, b_new = [], []
...     last = float('-inf')
...     for x,y in zip(a,b):
...         if x > last:
...             last = x
...             a_new.append(x)
...             b_new.append(y)
...     return a_new, b_new
...
>>> def np_monotonic(a, b):
...     a_new, idx = np.unique(np.maximum.accumulate(a), return_index=True)
...     return a_new, b[idx]
...

请注意,这些方法并不严格等价。其中一种方法停留在原始的 Python 领域,而另一种方法则停留在 numpy 数组领域。我们将比较它们的性能,假设您使用相应的数据结构(numpy.arraylist)开始:

首先,让我们考虑一个小型列表,与 OP 示例中相同。我们可以看到,针对小型数据结构,numpy 并不实际上更快,这并不令人意外:

>>> timeit.timeit("monotonic(a,b)", "from __main__ import monotonic, small; a, b = small", number=10000)
0.039130652003223076
>>> timeit.timeit("np_monotonic(a,b)", "from __main__ import np_monotonic, small, np; a, b = np.array(small[0]), np.array(small[1])", number=10000)
0.10779813499539159

现在,我们有一个包含10,000个元素的“中等”列表/数组,此时我们开始看到 numpy 的优势:

>>> timeit.timeit("monotonic(a,b)", "from __main__ import monotonic, medium; a, b = medium[0].tolist(), medium[1].tolist()", number=10000)
4.642718859016895
>>> timeit.timeit("np_monotonic(a,b)", "from __main__ import np_monotonic, medium; a, b = medium", number=10000)
1.3776302759943064

有趣的是,优势似乎在“大型”数组中缩小了,在1e7个元素的数量级上:

>>> timeit.timeit("monotonic(a,b)", "from __main__ import monotonic, large; a, b = large[0].tolist(), large[1].tolist()", number=10)
4.400254560023313
>>> timeit.timeit("np_monotonic(a,b)", "from __main__ import np_monotonic, large; a, b = large", number=10)
3.593393853981979

注意,在最后一组时间测试中,我只做了10次,但如果有人有更好的机器或更多耐心,请随意增加number的数量。


根据我的整体经验,C编译模块比纯Python快10到100倍,所以我打赌numpy的解决方案会更快...好奇我是否正确。希望原帖作者能提供计时结果。 - Claudio
1
@Claudio 根据我的经验,使用 numpy 数组相对于普通的 Python 列表,在像这样的简单操作中速度上的优势并不总是直观的,并且取决于你的数据结构的大小。请查看我刚添加的修改,其中展示了不同大小的数据结构的时间计算。 - juanpa.arrivillaga

10

uniquereturn_index使用argsort。使用maximum.accumulate不需要它们。因此,我们可以利用unique,并执行以下操作:

In [313]: a = [2,1,2,3,4,5,4,6,5,7,8,9,8,10,11]
In [314]: arr = np.array(a)
In [315]: aux = np.maximum.accumulate(arr)
In [316]: flag = np.concatenate(([True], aux[1:] != aux[:-1])) # key unique step
In [317]: idx = np.nonzero(flag)
In [318]: idx
Out[318]: (array([ 0,  3,  4,  5,  7,  9, 10, 11, 13, 14], dtype=int32),)
In [319]: arr[idx]
Out[319]: array([ 2,  3,  4,  5,  6,  7,  8,  9, 10, 11])
In [320]: np.array(b)[idx]
Out[320]: array([ 1,  4,  5,  6,  8, 10, 11, 12, 14, 15])

In [323]: np.unique(aux, return_index=True)[1]
Out[323]: array([ 0,  3,  4,  5,  7,  9, 10, 11, 13, 14], dtype=int32)

def foo(arr):
    aux=np.maximum.accumulate(arr)
    flag = np.concatenate(([True], aux[1:] != aux[:-1]))
    return np.nonzero(flag)[0]

In [330]: timeit foo(arr)
....
100000 loops, best of 3: 12.5 µs per loop
In [331]: timeit np.unique(np.maximum.accumulate(arr), return_index=True)[1]
....
10000 loops, best of 3: 21.5 µs per loop

有 (10000,) 形状的 medium,这个没有排序的唯一性具有显著的速度优势:

In [334]: timeit np.unique(np.maximum.accumulate(medium[0]), return_index=True)[1]
1000 loops, best of 3: 351 µs per loop
In [335]: timeit foo(medium[0])
The slowest run took 4.14 times longer ....
10000 loops, best of 3: 48.9 µs per loop

[1]: 使用np.source(np.unique)查看代码,或在IPython中使用 ??


你没有以相同的方式计时。第一种情况中存在函数调用开销。 - kmario23
@kmario23 一个函数调用不会导致这些差异。 - juanpa.arrivillaga
剩下要做的就是切片数组。np.nonzero 返回重复项的索引,就像 np.uniquereturn_index=True 一样。 - piRSquared
仅仅为了好玩,我对使用大量随机数据的 np.unique 解决方案进行了分析:cumtime: np_monotonic: 493ms cummax: 47ms unique: 433ms,其中无意义的 argsort: 292ms - Paul Panzer

0
a = [2, 1, 2, 3, 4, 5, 4, 6, 5, 7, 8, 9, 8,10,11]
b = [1, 2, 3, 4, 5, 6, 7, 8, 9,10,11,12,13,14,15]
print(sorted(set(a)))
print(sorted(set(b)))
#[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
#[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]

你的解决方案与问题中列出的要求不符。由于你正在回答一个三年前已有接受答案的问题,你可能需要考虑你的答案是否有价值。 - 9bO3av5fw5

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接