在一组集合中,找出一个集合与所有其他集合相似度的比较方法。

5

我正在尝试通过计算匹配元素的数量来比较一个集合与集合中所有其他集合的相似度。一旦我获得了计数,我想针对具有最高计数的前X个(目前为100个)相似集合(具有最高计数的集合)执行进一步操作。 我提供了一个示例输入和输出,其中显示了两个集合之间匹配元素的计数:

输入

{
  "list1": [
    "label1",
    "label2",
    "label3"
  ],
  "list2": [
    "label2",
    "label3",
    "label4"
  ],
  "list3": [
    "label3",
    "label4",
    "label5"
  ],
  "list4": [
    "label4",
    "label5",
    "label6"
  ]
}

输出

{
  "list1": {
    "list1": 3,
    "list2": 2,
    "list3": 1,
    "list4": 0
  },
  "list2": {
    "list1": 2,
    "list2": 3,
    "list3": 2,
    "list4": 1
  },
  "list3": {
    "list1": 1,
    "list2": 2,
    "list3": 3,
    "list4": 2
  },
  "list4": {
    "list1": 0,
    "list2": 1,
    "list3": 2,
    "list4": 3
  }
}

我想到以下代码,但对于大约200,000组输入需要数小时才能运行完成。每个组中元素/标签的数量有所不同,但每个组平均约有10个元素。唯一标签值的总数约为300个。
    input = {}
    input['list1'] = ['label1', 'label2', 'label3']
    input['list2'] = ['label2', 'label3', 'label4']
    input['list3'] = ['label3', 'label4', 'label5']
    input['list4'] = ['label4', 'label5', 'label6']
    print(json.dumps(input, indent=2))
    input = {key: set(value) for key, value in input.items()}
    output = {key1: {key2: 0 for key2 in input.keys()} for key1 in input.keys()}
    for key1, value1 in input.items():
        for key2, value2 in input.items():
            for element in value1:
                if element in value2:
                    count = output[key1][key2]
                    output[key1][key2] = count + 1

    print(json.dumps(output, indent=2))

当集合的数量比较大时,有没有任何想法可以提高以上代码的执行时间?

感谢任何建议!


只是为了确保我理解:那些“进一步的操作”将把这200,000个集合中的每一个都与它的100个最相似的其他集合组合起来?所以还是20,000,000次操作? - no comment
这些标签是否相对随机分布,或者某些标签或标签组合是否比其他标签更常见? - no comment
1
@don'ttalkjustcode 是的,“进一步操作”将会把这20万个集合中的每一个都与其最相似的100个其他集合结合起来。这些标签分布相对随机。 - jbwt
2个回答

5

使用倒排索引来避免计算与那些交集的基数为0的集合的交集:

from collections import defaultdict, Counter
from itertools import chain
from pprint import pprint

data = {
    "list1": ["label1", "label2", "label3"],
    "list2": ["label2", "label3", "label4"],
    "list3": ["label3", "label4", "label5"],
    "list4": ["label4", "label5", "label6"]
}

index = defaultdict(list)
for key, values in data.items():
    for value in values:
        index[value].append(key)

result = {key: Counter(chain.from_iterable(index[label] for label in labels)) for key, labels in data.items()}
pprint(result)

输出

{'list1': Counter({'list1': 3, 'list2': 2, 'list3': 1}),
 'list2': Counter({'list2': 3, 'list1': 2, 'list3': 2, 'list4': 1}),
 'list3': Counter({'list3': 3, 'list2': 2, 'list4': 2, 'list1': 1}),
 'list4': Counter({'list4': 3, 'list3': 2, 'list2': 1})}

如果必要,您可以按照以下方式包含那些交集基数为0的集合:
result = {key: {k: value.get(k, 0) for k in data} for key, value in result.items()}
pprint(result)

输出

{'list1': {'list1': 3, 'list2': 2, 'list3': 1, 'list4': 0},
 'list2': {'list1': 2, 'list2': 3, 'list3': 2, 'list4': 1},
 'list3': {'list1': 1, 'list2': 2, 'list3': 3, 'list4': 2},
 'list4': {'list1': 0, 'list2': 1, 'list3': 2, 'list4': 3}}

第二种选择来自于观察到大部分时间都用于查找集合的交集,因此使用更快速的数据结构,如Roaring Bitmap是有用的:

from collections import defaultdict
from pprint import pprint
from pyroaring import BitMap

data = {
    "list1": ["label1", "label2", "label3"],
    "list2": ["label2", "label3", "label4"],
    "list3": ["label3", "label4", "label5"],
    "list4": ["label4", "label5", "label6"]
}

# all labels
labels = set().union(*data.values())

# lookup mapping to an integer
lookup = {key: value for value, key in enumerate(labels)}

roaring_data = {key: BitMap(lookup[v] for v in value) for key, value in data.items()}


result = defaultdict(dict)
for k_out, outer in roaring_data.items():
    for k_in, inner in roaring_data.items():
        result[k_out][k_in] = len(outer & inner)

pprint(result)

输出

defaultdict(<class 'dict'>,
            {'list1': {'list1': 3, 'list2': 2, 'list3': 1, 'list4': 0},
             'list2': {'list1': 2, 'list2': 3, 'list3': 2, 'list4': 1},
             'list3': {'list1': 1, 'list2': 2, 'list3': 3, 'list4': 2},
             'list4': {'list1': 0, 'list2': 1, 'list3': 2, 'list4': 3}})

性能分析

Perfomance Comparison

上面的图表显示了在一个长度由x轴的值给出的字典data上的性能,字典中的每个值都是从一个有100个标签的总体中随机抽样的10个标签的列表。与直觉相反,Roaring Bitmap的性能最差,而使用倒排索引所需时间不到一半(大约40%)。可以在这里找到复制上述结果的代码。

1
BitMap可以接受任何可迭代对象吗,还是只能是list?使用BitMap(lookup[v] for v in value)会更高效。 - sj95126
@sj95126 它可以接受任何可迭代对象。已修复。 - Dani Mesejo
1
在我的小型测试中,使用了20,000个集合,每个集合包含来自300个可能标签值的选择中随机化的10个标签。使用您提供的倒排索引解决方案是一种改进。 - jbwt

3

假设大多数列表对没有交集,则下面的代码应该更快。如果速度不够快,并且可以接受近似结果,则可以尝试min-hashing(将k设置为更低的值以获得更高的速度,将其设置为更高的值以获得更高的召回率)。

input = {
    "list1": ["label1", "label2", "label3"],
    "list2": ["label2", "label3", "label4"],
    "list3": ["label3", "label4", "label5"],
    "list4": ["label4", "label5", "label6"],
}


import collections
import hashlib


def optional_min_hash(values, k=None):
    return (
        values
        if k is None
        else sorted(
            hashlib.sha256(str(value).encode("utf8")).digest() for value in values
        )[:k]
    )


buckets = collections.defaultdict(list)
for key, values in input.items():
    for value in optional_min_hash(values):
        buckets[value].append(key)
output = collections.defaultdict(dict)
for key1, key2 in {
    (key1, key2)
    for bucket in buckets.values()
    for key1 in bucket
    for key2 in bucket
    if key1 <= key2
}:
    count = len(set(input[key1]) & set(input[key2]))
    output[key1][key2] = count
    output[key2][key1] = count
print(output)

示例输出:

defaultdict(<class 'dict'>, {'list2': {'list4': 1, 'list1': 2, 'list2': 3, 'list3': 2}, 'list4': {'list2': 1, 'list4': 3, 'list3': 2}, 'list1': {'list2': 2, 'list3': 1, 'list1': 3}, 'list3': {'list1': 1, 'list2': 2, 'list3': 3, 'list4': 2}})

从有限的测试来看,这个答案有所改进。在计算完计数后取前X个结果有点随意,因此近似结果是可以接受的,而最小哈希的想法非常有帮助。 - jbwt

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接