通过应用传递闭包创建一个唯一数字列表

3

我有一个元组列表(每个元组包含2个数字),例如:

array = [(1, 2), (1, 3), (2, 4), (5, 8), (8, 10)]

假设这些数字是一些数据库对象(记录)的ID,元组中有重复对象的ID。这意味着1和2是重复的,1和3也是重复的,这意味着2和3也是重复的。
如果a == b且b == c,则a == c。
现在我想将所有这些重复对象的ID合并为一个元组,如下所示:
output = [(1, 2, 3, 4), (5, 8, 10)]

我知道可以使用循环和冗余匹配来完成这个任务。但我希望有更好的解决方案,能够在计算上更加轻量级(如果有的话)。


1
你将永远无法摆脱至少一个循环来完成这个任务。而且可能需要多个循环。但是,你可以使用不相交集数据结构使其更加高效:https://en.wikipedia.org/wiki/Disjoint-set_data_structure - Willem Van Onsem
为什么不展示你使用循环的尝试呢?这可以成为其他人改进的良好起点。 - Chris_Rands
4个回答

4
您可以使用一种数据结构使合并操作更加高效。在此过程中,您需要创建某种相反的树形结构。因此,在您的示例中,您首先将创建列出的数字:
1  2  3  4  5  8  10

现在,如果您遍历元组(1,2),则会在某种字典中查找12。您搜索它们的祖先(这里没有),然后创建某种合并节点
1  2  3  4  5  8  10
 \/
 12

下一步,我们合并(1,3),因此我们查找1的祖先(12)和3的祖先(3),然后执行另一个合并操作:
1  2  3  4  5  8  10
 \/   |
 12  /
   \/
  123

下一步我们合并 (2,4)(5,8)(8,10):
1  2  3  4  5  8  10
 \/   |  |   \/   |
 12  /   |   58  /
   \/   /      \/
  123  /      5810
     \/
    1234

您还需要维护一个“合并头”列表,以便轻松返回元素。

开始动手

现在我们知道如何构建这样的数据结构了,让我们来实现它。首先,我们定义一个节点:

class Merge:

    def __init__(self,value=None,parent=None,subs=()):
        self.value = value
        self.parent = parent
        self.subs = subs

    def get_ancestor(self):
        cur = self
        while cur.parent is not None:
            cur = cur.parent
        return cur

    def __iter__(self):
        if self.value is not None:
            yield self.value
        elif self.subs:
            for sub in self.subs:
                for val in sub:
                    yield val

现在我们首先为列表中的每个元素初始化一个字典:
vals = set(x for tup in array for x in tup)

vals中的每个元素创建一个字典,将其映射到Merge

dic = {val:Merge(val) for val in vals}

还有 merge_heads

merge_heads = set(dic.values())

现在对于数组中的每个元组,我们查找相应的祖先Merge对象,我们在其上面创建一个新的Merge,从merge_head集合中删除两个旧的头,并将新的merge添加到其中:

for frm,to in array:
    mra = dic[frm].get_ancestor()
    mrb = dic[to].get_ancestor()
    mr = Merge(subs=(mra,mrb))
    mra.parent = mr
    mrb.parent = mr
    merge_heads.remove(mra)
    merge_heads.remove(mrb)
    merge_heads.add(mr)

最后,在完成这一步骤后,我们可以简单地为merge_heads中的每个Merge构建一个set:

resulting_sets = [set(merge) for merge in merge_heads]

然后resulting_sets将会是(顺序可能不同):

[{1, 2, 3, 4}, {8, 10, 5}]

将所有内容组合在一起(不包括class定义):
vals = set(x for tup in array for x in tup)
dic = {val:Merge(val) for val in vals}
merge_heads = set(dic.values())
for frm,to in array:
    mra = dic[frm].get_ancestor()
    mrb = dic[to].get_ancestor()
    mr = Merge(subs=(mra,mrb))
    mra.parent = mr
    mrb.parent = mr
    merge_heads.remove(mra)
    merge_heads.remove(mrb)
    merge_heads.add(mr)
resulting_sets = [set(merge) for merge in merge_heads]

这将最坏情况下以O(n2)运行,但您可以使树平衡,使祖先在O(log n)中找到,从而使其变为O(n log n)。此外,您还可以短路祖先列表,使其更快。


2
你可以使用不相交集合。
不相交集合实际上是一种树形结构。我们将每个数字视为一个树节点,每次读入一个边缘 (u, v) 时,我们只需通过将一个树的根节点指向另一个树(如果不存在,则创建一个单节点树)来轻松关联两个树 uv。最后,我们只需遍历整个森林以获取结果。
from collections import defaultdict


def relation(array):

    mapping = {}

    def parent(u):
        if mapping[u] == u:
            return u
        mapping[u] = parent(mapping[u])
        return mapping[u]

    for u, v in array:
        if u not in mapping:
            mapping[u] = u
        if v not in mapping:
            mapping[v] = v
        mapping[parent(u)] = parent(v)

    results = defaultdict(set)

    for u in mapping.keys():
        results[parent(u)].add(u)

    return [tuple(x) for x in results.values()]

在上面的代码中,mapping[u] 存储节点 u 的祖先(父节点或根节点)。特别地,对于单个节点的树的节点,其祖先是它本身。

据我所知,这并不完全可行:您需要进行比较,并始终将较小的节点合并到较大的节点中。 - Willem Van Onsem
@WillemVanOnsem,你能提供一个反例吗? - hsfzxjy
没事,我心里想的数据结构稍微有点不同。 - Willem Van Onsem

1
请参考我对Moinuddin答案的评论:这个被接受的答案并没有通过你可以在http://rosettacode.org/wiki/Set_consolidation#Python找到的测试。虽然我没有深入研究。
基于Willem的答案,我想提出一个新的建议。 这个建议中的问题在于get_ancestor调用中的递归:每次被问及我们的祖先时,为什么我们要向上爬树,而不是只记住最后找到的根节点(如果它改变了,仍然可以从那个点向上爬)。事实上,Willem的算法不是线性的(大约是nlogn或n²),而我们可以轻松地消除这种非线性。
另一个问题来自迭代器:如果树太深(在我的用例中遇到了这个问题),你会在迭代器内部得到一个Python异常(Too much recursion)。因此,我们应该合并子叶(而不是构建完整的树),并且我们应该用N个叶子构建分支,而不是只有两个叶子的分支。
我的代码版本如下:
class Merge:

    def __init__(self,value=None,parent=None,subs=None):
        self.value = value
        self.parent = parent
        self.subs = subs
        self.root = None
        if self.subs:
            subs_a,subs_b = self.subs
            if subs_a.subs:
                subs_a = subs_a.subs
            else:
                subs_a = [subs_a]
            if subs_b.subs:
                subs_b = subs_b.subs
            else:
                subs_b = [subs_b]
            self.subs = subs_a+subs_b

            for s in self.subs:
                s.parent = self
                s.root = None
    def get_ancestor(self):
        cur = self if self.root is None else self.root
        while cur.parent is not None:
            cur = cur.parent
        if cur != self:
            self.root = cur
        return cur

    def __iter__(self):
        if self.value is not None:
            yield self.value
        elif self.subs:
            for sub in self.subs:
                for val in sub:
                    yield val
def treeconsolidate(array):
    vals = set(x for tup in array for x in tup)
    dic = {val:Merge(val) for val in vals}
    merge_heads = set(dic.values())
    for settomerge in array:
        frm = settomerge.pop()
        for to in settomerge:
            mra = dic[frm].get_ancestor()
            mrb = dic[to].get_ancestor()
            if mra == mrb:
                continue
            mr = Merge(subs=[mra,mrb])
            merge_heads.remove(mra)
            merge_heads.remove(mrb)
            merge_heads.add(mr)
    resulting_sets = [set(merge) for merge in merge_heads]
    return resulting_sets

在小的合并中,这不会改变很多东西,但我的经验表明,在包含许多元素的大型集合中爬树可能会花费很多时间:在我的情况下,我必须处理100k个集合,每个集合包含2到1000个元素,每个元素可能出现在1到1000个集合中...

-1

我认为实现这个的最有效方法将是使用 set,如下:

def transitive_cloure(array):
    new_list = [set(array.pop(0))]  # initialize first set with value of index `0`

    for item in array:
        for i, s in enumerate(new_list):
            if any(x in s for x in item):
                new_list[i] = new_list[i].union(item)
                break
        else:
            new_list.append(set(item))
    return new_list

示例运行:

>>> transitive_cloure([(1,2), (1,3), (2,4), (5,8), (8,10)])
[{1, 2, 3, 4}, {8, 10, 5}]

与其他答案的比较(基于Python 3.4):

  • 这个答案是:6.238126921001822

    >>> timeit.timeit("moin()", setup="from __main__ import moin")
    6.238126921001822
    
  • Willem的解决方案:29.115453064994654 (与类的声明相关的时间被排除在外)

    >>> timeit.timeit("willem()", setup="from __main__ import willem")
    29.115453064994654
    
  • hsfzxjy的解决方案:10.049749890022213

    >>> timeit.timeit("hsfzxjy()", setup="from __main__ import hsfzxjy")
    10.049749890022213
    

1
谢谢你,Moin。我认为你的代码简短而快速。我也尝试了不同的值。 - Zohaib Ijaz
这个实现似乎有问题: 请参见http://rosettacode.org/wiki/Set_consolidation#Python中的测试。特别是以下测试失败:transitive_cloure([{A,B}, {C,D}, {D,B}])应该返回[{'A', 'C', 'B', 'D'}],但实际上返回了{{'C', 'D'}, {'A', 'B', 'D'}}。 - Goulou
这个答案对于transitive_cloure([(1,2),(3,4),(2,3)])这样的情况不起作用,有很多类似的情况。 - amit thakur

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接