多个已排序列表的最快合并去重方法,返回有序结果。

5
当有一个列表的列表,其中所有的子列表都是有序的,例如: [[1,3,7,20,31], [1,2,5,6,7], [2,4,25,26]] 什么是最快的方法来获取这些列表的并集,而不重复,并且得到一个有序的结果? 因此,结果列表应为: [1,2,3,4,5,6,7,20,25,26,31] 。 我知道我可以将它们全部联合起来,然后排序,但是Python中是否内置了更快的方法(例如:在执行联合时进行排序)?
编辑:
所提出的答案是否比对所有子列表进行成对操作的算法更快?

解决方案. UNION(x, y)
1 答案<-()
2 当 x != NIL 且 y != NIL 时
3 如果 docID(x) = docID(y)
4 则将 docID(x) 添加到答案中
5 x<-next(x)
6 y<-next(y)
7 否则如果 docID(x)<docID(y)
8 则将 docID(x) 添加到答案中
9 x<-next(x)
10 否则将 docID(y) 添加到答案中
11 y<-next(y)
12 返回答案


请用文字转录替换图片。 - norok2
3个回答

5
你可以使用 heapq.merge 来实现这个功能:
from heapq import merge

def mymerge(v):
    last = None
    for a in merge(*v):
        if a != last:  # remove duplicates
            last = a
            yield a

print(list(mymerge([[1,3,7,20,31], [1,2,5,6,7], [2,4,25,26]])))
# [1, 2, 3, 4, 5, 6, 7, 20, 25, 26, 31]

2
哪里不符合 Pythonic 风格? - blhsing
2
@AlexandrShurigin 这不是Pythonic吗?这是标准库的一个很好的用法,也是生成器的优雅用法。我认为这非常符合Pythonic的风格。 - juanpa.arrivillaga
1
@AlexandrShurigin说“看起来很丑”?它看起来很好啊。简单、自我说明和Pythonic。而且它恰好是正确的... - juanpa.arrivillaga
2
你能否提供一些时间方面的信息,以回答关于“最快”部分的问题? :-) - norok2
2
你可以使用 itertools.groupby 来消除重复项 - [x for x, _ in itertools.groupby(merge(*v))] 是一条语句。 - kaya3
显示剩余8条评论

2

(编辑)

该问题的渐进理论最佳方法是使用优先队列,例如在heapq.merge()中实现的方法(感谢@kaya3指出)。

然而,在实践中,很多事情可能会出错。例如,复杂度分析中的常数因子足够大,以至于理论上最优的方法在现实场景中更慢。 这基本上取决于实现方式。 例如,Python在显式循环方面会受到一些速度惩罚。

因此,让我们考虑几种方法以及它们在某些具体输入下的表现。

方法

为了给您一些我们正在讨论的数字的概念,这里有一些方法:

  • merge_sorted()使用最简单的方法,即展平序列,将其缩减到一个set()(删除重复项),并按需排序
import itertools


def merge_sorted(seqs):
    return sorted(set(itertools.chain.from_iterable(seqs)))

  • merge_heapq() 实际上是 @arshajii 的答案。请注意,itertools.groupby() 变体略微(不到 ~1%)更快。
import heapq


def i_merge_heapq(seqs):
    last_item = None
    for item in heapq.merge(*seqs):
        if item != last_item:
            yield item
            last_item = item

def merge_heapq(seqs):
    return list(i_merge_heapq(seqs))

  • merge_bisect_set()是与merge_sorted()基本相同的算法,只不过结果现在使用高效的bisect模块进行排序插入而显式构造。由于sorted()基本上在Python中循环执行相同的操作,因此这并不会更快。
import itertools
import bisect


def merge_bisect_set(seqs):
    result = []
    for item in set(itertools.chain.from_iterable(seqs)):
        bisect.insort(result, item)
    return result

  • merge_bisect_cond()类似于merge_bisect_set(),但现在使用最终的list明确执行非重复约束条件。然而,这比仅使用set()要昂贵得多(实际上它太慢了,被排除在图表之外)。
def merge_bisect_cond(seqs):
    result = []
    for item in itertools.chain.from_iterable(seqs):
        if item not in result:
            bisect.insort(result, item)
    return result

  • merge_pairwise() 显式地实现了理论上高效的算法,类似于您在问题中概述的算法。
def join_sorted(seq1, seq2):
    result = []
    i = j = 0
    len1, len2 = len(seq1), len(seq2)
    while i < len1 and j < len2:
        if seq1[i] < seq2[j]:
            result.append(seq1[i])
            i += 1
        elif seq1[i] > seq2[j]:
            result.append(seq2[j])
            j += 1
        else:  # seq1[i] == seq2[j]
            result.append(seq1[i])
            i += 1
            j += 1
    if i < len1:
        result.extend(seq1[i:])
    elif j < len2:
        result.extend(seq2[j:])
    return result


def merge_pairwise(seqs):
    result = []
    for seq in seqs:
        result = join_sorted(result, seq)
    return result

  • merge_loop() 实现了上述方法的一般化,现在只需对所有序列执行一次 pass,而不是成对执行。
def merge_loop(seqs):
    result = []
    lengths = list(map(len, seqs))
    idxs = [0] * len(seqs)
    while any(idx < length for idx, length in zip(idxs, lengths)):
        item = min(
            seq[idx]
            for idx, seq, length in zip(idxs, seqs, lengths) if idx < length)
        result.append(item)
        for i, (idx, seq, length) in enumerate(zip(idxs, seqs, lengths)):
            if idx < length and seq[idx] == item:
                idxs[i] += 1
    return result

基准测试

通过使用以下方法生成输入:

def gen_input(n, m=100, a=None, b=None):
    if a is None and b is None:
        b = 2 * n * m
        a = -b
    return tuple(tuple(sorted(set(random.randint(int(a), int(b)) for _ in range(n)))) for __ in range(m))

对于不同的n(每个序列的大小)和m(序列的数量),以及ab(生成的最小和最大数)的不同值,性能通常会有所变化。 为简洁起见,本答案未进行探索,但可以在此处进行尝试,其中还包括一些其他实现,尤其是使用Cython进行的一些试图加速,但只部分成功。

可以绘制不同n的时间:

bm_full bm_zoom


@kaya3 好的,我误解了你的 N。我考虑的是长度为 Nm 个列表的情况。如果 N 是元素的总数,则 heapq.merge 应该是 O(2 N log N)(一次遍历构建堆,一次遍历拉取它)。相反,成对连接的时间复杂度为 O((N / m) * m²),因此为 O(N m),只要 m < 2 log N,那么实际上更有效率。 - norok2
1
不,heapq.merge并不会一次性将所有N个元素插入堆中;它只需要从每个序列中获取当前最小的m个元素。你可以查看源代码:它构建了一个大小为m的堆,然后对每个元素调用_heapreplace方法,该方法既进行插入又进行删除操作,使得堆的大小保持为m。https://github.com/python/cpython/blob/master/Lib/heapq.py#L314 - kaya3
@kaya3 好的,但是如果 m < log(N) 的话,理论上成对连接会更快。 - norok2
1
不,成对连接的时间复杂度为 O(Nm),而 heapq.merge 的时间复杂度为 O(N log m)。后者在渐近意义下始终更好。 - kaya3
1
好的,我之前忽略了 heapq.merge 的内部堆大小为 m。现在我确信了,没错 :-) - norok2
显示剩余4条评论

-2

在Python-3中,您可以使用集合。

mylist = [[1,3,7,20,31], [1,2,5,6,7], [2,4,25,26]]

mynewlist = mylist[0] + mylist[1] + mylist[2]

print(list(set(mynewlist)))

输出:

[1, 2, 3, 4, 5, 6, 7, 20, 25, 26, 31]

首先使用列表加法合并所有子列表。

然后将其转换为集合对象,在其中删除所有重复项,这些项也将按升序排序。

将其转换回列表。它会给出您想要的输出。

希望它能回答您的问题。


这非常低效,是二次时间复杂度(不要使用串联来合并列表),而且它是不正确的,因为它没有排序。 - juanpa.arrivillaga
我认为你可以通过将给定的列表解包到集合构造函数中来完成,但你仍然需要将解包后的列表转换为单个迭代器。然后将最终语句中的“list”更改为返回列表的sorted - mgrollins

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接