匹配集合的数据结构

15

我有一个应用程序,其中有多个集合,一个集合可能是
{4, 7, 12, 18}
独特的数字,并且都小于50。

然后我有几个数据项:
1 {1, 2, 4, 7, 8, 12, 18, 23, 29}
2 {3, 4, 6, 7, 15, 23, 34, 38}
3 {4, 7, 12, 18}
4 {1, 4, 7, 12, 13, 14, 15, 16, 17, 18}
5 {2, 4, 6, 7, 13, 15}

数据项1、3和4与该集合匹配,因为它们包含集合中的所有项。

我需要设计一种数据结构,它可以很快地识别出数据项是集合的成员是否包含所有作为集合一部分的成员(因此数据项是集合的超集)。目前最好的估计是将少于50,000个集合。

我的当前实现方式使用64位无符号整数存储我的集合和数据,并将集合存储在列表中。然后,我通过迭代列表执行((set & data) == set)比较来检查数据项。它有效并且空间效率高,但是速度较慢(O(n)),我愿意为一些性能牺牲一些内存。有没有更好的想法如何组织它?

编辑: 非常感谢所有答案。看起来我需要提供有关问题的更多信息。我首先获取集合,然后逐个获取数据项。我需要检查数据项是否与其中一个集合匹配。
集合很可能是'稠密的',例如对于给定的问题1、3和9可能包含在95%的集合中;我可以事先某种程度上预测这一点(但不太准确)。

对于那些提出备忘录技术的人:这是备忘录函数的数据结构。集合代表已经计算过的通用解决方案,而数据项则是函数的新输入。通过将数据项与通用解决方案进行匹配,我可以避免大量的处理。


你有这方面的样本数据吗?听起来很有趣,可以试着玩一下。 - Stephen
你是在搜索一个数据项集合以找到其中包含集合元素的项,还是被给定一个特定的数据项并被要求判断它是否包含集合中的所有元素,或者是被给定一个数据项并将其与许多不同的集合进行比较? - Ken Bloom
1
(如果是前者,那么你所做的就是搜索引擎所做的事情。) - Ken Bloom
13个回答

9
我看到了另一种解决方案,与你的方案相反(即对每个集合测试数据项),使用二叉树,在其中每个节点测试特定项是否包含在内。
例如,如果您有集合 A = { 2, 3 },B = { 4 } 和 C = { 1, 3 },则会得到以下树形结构。
                      _NOT_HAVE_[1]___HAVE____
                      |                      |            
                _____[2]_____          _____[2]_____
                |           |          |           |
             __[3]__     __[3]__    __[3]__     __[3]__
             |     |     |     |    |     |     |     |
            [4]   [4]   [4]   [4]  [4]   [4]   [4]   [4]
            / \   / \   / \   / \  / \   / \   / \   / \
           .   B .   B .   B .   B    B C   B A   A A   A
                                            C     B C   B
                                                        C

制作完树结构后,你只需要进行50次比较——或者你可以在一个集合中拥有多少个项就进行多少次比较。
例如,对于{1,4},你通过树向右分支(该集合包含1),向左分支(不包含2),向左分支,向右分支,最终得到[B],这意味着仅集合B包含在{1,4}中。
这基本上被称为“二进制决策图”。如果你对节点中的冗余感到不满(因为2^50是很多节点......),那么你应该考虑简化形式,即所谓的“简化有序二进制决策图”,它是一种常用的数据结构。在这个版本中,当节点冗余时它们被合并,你将不再拥有二叉树,而是有一个有向无环图。 Wikipedia page on ROBBDs可以为你提供更多信息,以及实现此数据结构的各种语言库的链接。

你说得对,这可能是内存/性能权衡的全部范围。我的计算表明,基本BDT需要2^51 = 2,251,799,813,685,248个节点,即使像堆一样构造它也会使用2 PB的内存。我对ROBDD很感兴趣,我会去阅读相关资料。 - Daniel
1
Jérémie,感谢你的指引!我的意思是,丹尼尔可能很不幸,他的BDD压缩效果不太好;有没有什么经验法则可以用来判断哪些BDD适合压缩?我的意思是,对于文本文件,它是“某种字符比其他字符使用更频繁||多次重复”,但对于BDD呢? - Heinrich Apfelmus
2
顺便提一下,你的示例图混淆了超级集和子集。问题不是很清楚,但我认为丹尼尔的意思是查询是存储集合中的一个子集,而不是反过来。 - Heinrich Apfelmus
我相信他存储了集合A、B、C,并希望能够查询给定的集合X(他称之为“数据项”)是否是已存储的集合A、B、C中任何一个超集。但除此之外,超/子集是同一件事情:正如您所知道的,如果非B包含于非A,则A包含于B。我的意思是,您可以通过反转所有测试来实现从一个到另一个的转换。 - Jérémie
关于压缩:我不知道是否有类似的布尔函数经验法则。 ROBDD被认为是存储布尔函数最有效的方式之一 - 当您想要操作它时。 必须有某种分布分析(如果没有:这将是一个有趣的话题)。 一个重要因素是ROBDD是有序的,而且事实证明节点/问题的顺序影响可能的缩减。 (对不起,我的回答很简短,但在这些评论中你不能写太多!! :-)) - Jérémie
显示剩余4条评论

2
我不能证明,但我相当确定没有解决方案可以轻松地打破O(n)的限制。你的问题是“太一般了”:每个集合都有m = 50个属性(即,属性k是它包含数字k),而且所有这些属性都是相互独立的。没有任何聪明的属性组合可以预测其他属性的存在。排序不起作用,因为问题非常对称,你的50个数字的任何排列都会给出相同的问题,但会破坏任何排序。除非你的输入具有“隐藏结构”,否则你就没戏了。
然而,有一些速度/内存折衷的余地。也就是说,你可以预先计算小查询的答案。让Q成为一个查询集,supersets(Q)成为包含Q的集合的集合,即你问题的解决方案。那么,你的问题具有以下关键属性
Q ⊆ P  =>  supersets(Q) ⊇ supersets(P)

换句话说,对于 P = {1,3,4} 的结果是 Q = {1,3} 的结果的一个子集。
现在,预处理所有小查询的答案。为了演示,我们来看看所有大小小于等于3的查询,您将得到一个表格。
supersets({1})
supersets({2})
...
supersets({50})
supersets({1,2})
supersets({2,3})
...
supersets({1,2,3})
supersets({1,2,4})
...

supersets({48,49,50})

这个算法中有O(m^3)个条目。比如要计算supersets({1,2,3,4}),你需要查找superset({1,2,3})并在这个集合上运行线性算法。关键是平均而言,superset({1,2,3})将不包含全部的n=50,000个元素,而只有其中的一部分,即n/2^3=6250个元素,从而提高了8倍的速度。

(这是其他答案所建议的“反向索引”方法的一般化)

根据你的数据集,内存使用会非常可怕。但是你可以通过注意到像{1,2,3,4}这样的查询可以从几个不同的预计算答案中计算出来,例如supersets({1,2,3})supersets({1,2,4}),并且你将使用其中最小的一个来省略一些行或加快算法。


我倾向于你的想法,即在一般情况下,O(n) 是无法被超越的。 - TechNeilogy
你无法证明它因为它不是真的:在指针机模型中,有无限空间的情况下,O(#不同数字)是微不足道的。 - user382751
啊,你可以存储所有结果,但这有点退步了。一个合理的空间限制,例如“多项式关于不同数字的数量”,将排除此解决方案。 - Heinrich Apfelmus

1

与搜索条件匹配的集合的索引类似于集合本身。我们不再拥有小于50的唯一索引,而是拥有小于50000的唯一索引。由于您不介意使用一些内存,因此可以预先计算出一个50个元素的数组,其中包含50000位整数的匹配集合。然后,您可以索引预计算的匹配项,并基本上只需在表示匹配集合的50000位数字上执行((set & data) == set)操作。这就是我的意思。

#include <iostream>

enum
{
    max_sets = 50000, // should be >= 64
    num_boxes = max_sets / 64 + 1,
    max_entry = 50
};

uint64_t sets_containing[max_entry][num_boxes];

#define _(x) (uint64_t(1) << x)

uint64_t sets[] =
{
    _(1) | _(2) | _(4) | _(7) | _(8) | _(12) | _(18) | _(23) | _(29),
    _(3) | _(4) | _(6) | _(7) | _(15) | _(23) | _(34) | _(38),
    _(4) | _(7) | _(12) | _(18),
    _(1) | _(4) | _(7) | _(12) | _(13) | _(14) | _(15) | _(16) | _(17) | _(18),
    _(2) | _(4) | _(6) | _(7) | _(13) | _(15),
    0,
};

void big_and_equals(uint64_t lhs[num_boxes], uint64_t rhs[num_boxes])
{
    static int comparison_counter = 0;
    for (int i = 0; i < num_boxes; ++i, ++comparison_counter)
    {
        lhs[i] &= rhs[i];
    }
    std::cout
        << "performed "
        << comparison_counter
        << " comparisons"
        << std::endl;
}

int main()
{
    // Precompute matches
    memset(sets_containing, 0, sizeof(uint64_t) * max_entry * num_boxes);

    int set_number = 0;
    for (uint64_t* p = &sets[0]; *p; ++p, ++set_number)
    {
        int entry = 0;
        for (uint64_t set = *p; set; set >>= 1, ++entry)
        {
            if (set & 1)
            {
                std::cout
                    << "sets_containing["
                    << entry
                    << "]["
                    << (set_number / 64)
                    << "] gets bit "
                    << set_number % 64
                    << std::endl;

                uint64_t& flag_location =
                    sets_containing[entry][set_number / 64];

                flag_location |= _(set_number % 64);
            }
        }
    }

    // Perform search for a key
    int key[] = {4, 7, 12, 18};
    uint64_t answer[num_boxes];
    memset(answer, 0xff, sizeof(uint64_t) * num_boxes);

    for (int i = 0; i < sizeof(key) / sizeof(key[0]); ++i)
    {
        big_and_equals(answer, sets_containing[key[i]]);
    }

    // Display the matches
    for (int set_number = 0; set_number < max_sets; ++set_number)
    {
        if (answer[set_number / 64] & _(set_number % 64))
        {
            std::cout
                << "set "
                << set_number
                << " matches"
                << std::endl;
        }
    }

    return 0;
}

运行此程序会产生以下结果:
sets_containing[1][0] gets bit 0
sets_containing[2][0] gets bit 0
sets_containing[4][0] gets bit 0
sets_containing[7][0] gets bit 0
sets_containing[8][0] gets bit 0
sets_containing[12][0] gets bit 0
sets_containing[18][0] gets bit 0
sets_containing[23][0] gets bit 0
sets_containing[29][0] gets bit 0
sets_containing[3][0] gets bit 1
sets_containing[4][0] gets bit 1
sets_containing[6][0] gets bit 1
sets_containing[7][0] gets bit 1
sets_containing[15][0] gets bit 1
sets_containing[23][0] gets bit 1
sets_containing[34][0] gets bit 1
sets_containing[38][0] gets bit 1
sets_containing[4][0] gets bit 2
sets_containing[7][0] gets bit 2
sets_containing[12][0] gets bit 2
sets_containing[18][0] gets bit 2
sets_containing[1][0] gets bit 3
sets_containing[4][0] gets bit 3
sets_containing[7][0] gets bit 3
sets_containing[12][0] gets bit 3
sets_containing[13][0] gets bit 3
sets_containing[14][0] gets bit 3
sets_containing[15][0] gets bit 3
sets_containing[16][0] gets bit 3
sets_containing[17][0] gets bit 3
sets_containing[18][0] gets bit 3
sets_containing[2][0] gets bit 4
sets_containing[4][0] gets bit 4
sets_containing[6][0] gets bit 4
sets_containing[7][0] gets bit 4
sets_containing[13][0] gets bit 4
sets_containing[15][0] gets bit 4
performed 782 comparisons
performed 1564 comparisons
performed 2346 comparisons
performed 3128 comparisons
set 0 matches
set 2 matches
set 3 matches

3128个uint64_t比较赢过50000次比较。即使在最坏的情况下,也就是一个具有50个条目的密钥,您只需要执行num_boxes * max_entry次比较,在这种情况下为39100,仍然比50000好。


1

您可以使用数据项的倒排索引。以您的示例为例:

1 {1, 2, 4, 7, 8, 12, 18, 23, 29}
2 {3, 4, 6, 7, 15, 23, 34, 38}
3 {4, 7, 12, 18}
4 {1, 4, 7, 12, 13, 14, 15, 16, 17, 18}
5 {2, 4, 6, 7, 13, 15}

倒排索引将是:

1: {1, 4}
2: {1, 5}
3: {2}
4: {1, 2, 3, 4, 5}
5: {}
6: {2, 5}
...

因此,对于任何特定的集合{x_0,x_1,...,x_i},您需要交集x_0,x_1和其他集合。例如,对于集合{2,3,4},您需要将{1,5}{2}{1,2,3,4,5}相交。因为您可以将所有集合按反向索引排序,所以可以在要相交的集合长度的最小值中相交集合。

这里可能会有一个问题,如果您有非常“受欢迎”的项目(如我们示例中的4)具有巨大的索引集。

关于相交的一些话。您可以在反向索引中使用排序列表,并按递增长度顺序成对相交集合。或者,由于您不超过50K个项目,因此可以使用压缩的位集(每个数字约为6Kb,对于稀疏位集更少,约为50个数字,不那么贪婪),并按位相交位集。对于稀疏位集,我认为这将是有效的。


1

如果你想要提高性能,你需要做一些花哨的事情来减少你所进行的集合比较次数。

也许你可以将数据项分区,使得所有最小元素为1的数据项在一个组中,而所有最小元素为2的数据项在另一个组中,以此类推。

当进行搜索时,你可以找到搜索集合中的最小值,并查看该值所在的组。

或者,你可以按照“这个数据项包含N”(其中N = 1..50)将它们分成50组。

当进行搜索时,你可以找到每个元素所在的组的大小,然后只搜索最小的那个组。

这种方法的问题在于 - 特别是后者 - 减少搜索时间的开销可能会超过减少搜索空间所带来的性能收益。


+1. 这取决于问题域。如果位串列表在编译时已知,那么这应该可以很好地工作。 - EvilTeach

1
一种可能的方法来分配位图列表,是创建一个(编译后的半字节指示器)数组。
假设你的64位位图中有0到8位被设置了。 我们可以将其表示为十六进制0x000000000000001F。
现在,让我们将其转换为更简单和更小的表示形式。 每个4位半字节,要么至少有一个位被设置,要么没有。 如果有,我们将其表示为1,如果没有,我们将其表示为0。
因此,十六进制值减少为位模式0000000000000011,因为右侧2个半字节是唯一具有位的半字节。创建一个包含65536个值的数组,并将它们用作链表头或一组大数组的集合....
将每个位图编译成其紧凑的CNI。将其添加到正确的列表中,直到所有列表都已编译完成。

然后拿出你的针。将其编译成CNI形式。使用它来赋值,以订阅列表的头部。该列表中的所有位图都有可能匹配。 其他列表中的所有位图都无法匹配。

这是一种分配它们的方法。

现实情况下,我怀疑链表是否能满足您的性能要求。

如果您编写一个将位图编译为CNI的函数,您可以将其用作按CNI对数组进行排序的基础。然后,将您的65536个头的数组简单地订阅到原始数组中作为范围的开始。

另一种技术是仅编译64位位图的一部分,因此您只有较少的头。分析您的模式应该会给您一个关于哪些半字节最有效地将它们分割的想法。

祝您好运,并请告诉我们您最终做了什么。

邪恶。


我认为这不会起作用。你计算了每个CNI的预期条目数吗?假设均匀分布,我计算出50k个集合中约有45%的分数(1 -(1/2)^4)^(50/4)将具有所有位设置的CNI; 这使得您的方案相当无用,因为相当大一部分的集合具有相同的CNI。 - Heinrich Apfelmus
是的,需要对这些模式进行审慎分析。 - EvilTeach

0

您可以构建一个反向索引,其中包含每个元素的“干草堆”列表:

std::set<int> needle;  // {4, 7, 12, 18}
std::vector<std::set<int>> haystacks;
// A list of your each of your data sets:
// 1 {1, 2, 4, 7, 8, 12, 18, 23, 29}
// 2 {3, 4, 6, 7, 15, 23, 34, 38}
// 3 {4, 7, 12, 18}
// 4 {1, 4, 7, 12, 13, 14, 15, 16, 17, 18}
// 5 {2, 4, 6, 7, 13, 

std::hash_map[int, set<int>>  element_haystacks;
// element_haystacks maps each integer to the sets that contain it
// (the key is the integers from the haystacks sets, and 
// the set values are the index into the 'haystacks' vector):
// 1 -> {1, 4}  Element 1 is in sets 1 and 4.
// 2 -> {1, 5}  Element 2 is in sets 2 and 4.
// 3 -> {2}  Element 3 is in set 3.
// 4 -> {1, 2, 3, 4, 5}  Element 4 is in sets 1 through 5.  
std::set<int> answer_sets;  // The list of haystack sets that contain your set.
for (set<int>::const_iterator it = needle.begin(); it != needle.end(); ++it) {
  const std::set<int> &new_answer = element_haystacks[i];
  std::set<int> existing_answer;
  std::swap(existing_answer, answer_sets);
  // Remove all answers that don't occur in the new element list.
  std::set_intersection(existing_answer.begin(), existing_answer.end(),
                        new_answer.begin(), new_answer.end(),
                        inserter(answer_sets, answer_sets.begin()));
  if (answer_sets.empty()) break;  // No matches :(
}

// answer_sets now lists the haystack_ids that include all your needle elements.
for (int i = 0; i < answer_sets.size(); ++i) {
  cout << "set: " << element_haystacks[answer_sets[i]];
}

如果我没记错的话,这个程序的最大运行时间将为O(k*m),其中k是一个整数所属集合的平均数量,m是针集合的平均大小(小于50)。不幸的是,由于构建反向映射(element_haystacks),它将具有显着的内存开销。

如果您存储排序的vectors而不是sets,并且element_haystacks可以是50个元素的vector而不是hash_map,那么我相信您可以稍微改进一下。


假设n=50,000个元素均匀分布,每个桶将包含25,000个条目。毕竟,大约一半的集合包含15号元素,而另一半不包含。您需要比n操作更多才能交集这么大的桶。 - Heinrich Apfelmus
@Heinrich:如果元素均匀分布,你不会期望每个桶有50k/50=1k个元素吗?这意味着set_intersection在O(2k)比较时是有界的。因此,如果你的needle集合相对较小,这仍然应该少于50k次比较。无论如何,我的分析中确实有一个错误。已经修复。 - Stephen
不,如果桶是不相交的,公式50k/50将是正确的,但它们并不是。例如,集合{1,2,...,50}将存在于所有桶中。再次强调,平均而言,50k的一半包含数字15,而另一半则不包含;这使得第15个桶有25k个元素。 - Heinrich Apfelmus
@Heinrich:确实,50k/50会有50k个单元素集合,这是错误的。您假设每个干草堆集合将容纳~25个元素。这种解决方案在更大、重叠的干草堆集合中肯定会失效。另一方面,如果干草列表中平均有5个元素,那么这个解决方案可能在平均情况下表现得可接受。如果我对您的25k=25个元素的理解有误,您将不得不更详细地解释您推理背后的数学。 - Stephen
是的,我假设每个干草堆有大约25个元素。这是通过在从1到50的每个元素上抛硬币来随机选择集合时得到的结果,以决定它是否应该进入。当然,如果集合不是随机选择的,则平均值为5是可能的,您的建议确实可以利用这一点。 - Heinrich Apfelmus

0

我很惊讶没有人提到STL包含一个算法来处理这种情况。因此,你应该使用includes。正如它所描述的那样,它执行最多2*(N+M)-1次比较,最坏情况下的性能为O(M+N)。

因此:

bool isContained = includes( myVector.begin(), myVector.end(), another.begin(), another.end() );

如果你需要 O(log N) 的时间,我必须让位给其他回答者。

没错,"这种事情"并没有将问题的困难部分分类...他有50k个"另一个"。includes并不能减少这个集合...我猜没有人提到它是因为它不是特别相关的。 - Stephen

0
将您的集合放入一个数组中(而不是链表),并对它们进行排序。排序标准可以是1)集合中元素的数量(集合表示中1位的数量),或2)集合中最小的元素。例如,让A={7, 10, 16}和B={11, 17}。则在标准1)下,B
当新的数据项到达时,您可以使用二分查找(对数时间)来找到数组中的起始候选集。然后您可以通过线性搜索数组,并将数据项与数组中的集合进行比较,直到数据项变得“大于”该集合。
您应该根据您的集合分布选择排序标准。如果所有集合的最小元素都是0,则不应选择标准2)。反之,如果集合基数的分布不均匀,则不应选择标准1)。

另一个更强大的排序标准是计算每个集合中元素的跨度,并根据其进行排序。例如,集合A中最小的元素是7,最大的元素是16,因此您将其编码为0x1007;同样,B的跨度将为0x110B。根据“跨度编码”对集合进行排序,并再次使用二进制搜索来查找与数据项具有相同“跨度编码”的所有集合。

在普通的C语言中计算“跨度编码”很慢,但如果您使用汇编语言,它可以快速完成——大多数CPU都有指令以查找最/最不重要的集位。


不要按照计算困难的标准进行排序。只需按其作为64位整数的值进行排序。 - R.. GitHub STOP HELPING ICE
我认为你提出的排序标准不会起作用。你计算了预期的运行时间吗?例如,假设均匀分布,那么50,000组中有一半会包含元素1,使得排序标准1)相当无用。 - Heinrich Apfelmus
我如何在不知道集合分布的情况下计算预期运行时间?这就是为什么我建议几个标准。我猜你指的是第二个标准。 - zvrba
好的,你可以假设最坏的情况,也就是均匀分布。哎呀,我是指第二个标准,确实是这样的。 - Heinrich Apfelmus

0
这不是一个真正的答案,更像是一种观察:这个问题看起来可以有效地并行化甚至分布式处理,这将至少将运行时间降低到O(n /核心数)。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接