如何在Python中高效地搜索字符串中的列表元素

21

我有一个概念列表(myconcepts)和一个句子列表(sentences),如下所示。

concepts = [['natural language processing', 'text mining', 'texts', 'nlp'], ['advanced data mining', 'data mining', 'data'], ['discourse analysis', 'learning analytics', 'mooc']]


sentences = ['data mining and text mining', 'nlp is mainly used by discourse analysis community', 'data mining in python is fun', 'mooc data analysis involves texts', 'data and data mining are both very interesting']
简而言之,我想要在句子中找到概念。更具体地说,给定概念列表(例如['自然语言处理','文本挖掘','文本','nlp']),我想要识别这些概念并用其第一个元素替代它们(即自然语言处理)。
例如:如果我们考虑句子数据挖掘和文本挖掘,结果应该是高级数据挖掘和自然语言处理(因为数据挖掘文本挖掘的第一个元素分别是高级数据挖掘自然语言处理)。
上述示例数据的结果应为:
['advanced data mining and natural language processing', 'natural language processing is mainly used by discourse analysis community', 'advanced data mining in python is fun', 'discourse analysis advanced data mining analysis involves natural language processing', 'advanced data mining and advanced data mining are both very interesting']

我目前正在使用正则表达式来处理,如下所示:

concepts_re = []

for item in sorted_wikipedia_redirects:
        item_re = "|".join(re.escape(item) for item in item)
        concepts_re.append(item_re)

sentences_mapping = []

for sentence in sentences:
    for terms in concepts:
        if len(terms) > 1:
            for item in terms:
                if item in sentence:
                    sentence = re.sub(concepts_re[concepts.index(terms)], item[0], sentence)
    sentences_mapping.append(sentence)

在我的真实数据集中,我有大约800万个概念。因此,我的方法非常低效,并且处理一个句子需要大约5分钟的时间。我想知道是否有任何有效的方法在Python中完成这个任务。

对于那些想要处理长列表概念并测量时间的人,我在此附上了一个稍长一些的列表:https://drive.google.com/file/d/1OsggJTDZx67PGH4LupXIkCTObla0gDnX/view?usp=sharing

如有需要,我很乐意提供更多细节。


你的代码显示你使用了正则表达式,但是解释并没有说明为什么要这样做。使用直接字符串替换可能会稍微优化一下吗? - Serge Ballesta
@SergeBallesta 感谢您的评论。我也尝试了直接字符串替换,但是很慢。 - EmJ
3个回答

26

以下提供的解决方案在运行时复杂度方面大约为O(n),其中n是每个句子中标记的数量。

对于500万个句子和您的concepts.txt,它可以在约30秒内执行所需的操作,请参见第三部分的基本测试。

至于空间复杂度,您将需要保留一个嵌套的字典结构(现在让我们简化为这个),假设它是O(c*u),其中是某个概念(基于标记)的唯一标记,而c是概念的长度。

很难精确确定复杂度,但它与此类似(对于您提供的示例数据以及您提供的[concepts.txt],这些复杂度是相当准确的,但我们将通过实现详细了解其内部细节)。

我假设您可以按空格拆分您的概念和句子。如果不是这种情况,我建议您查看spaCy,它提供了更智能的方法来对您的数据进行标记化。

1. 介绍

让我们以您的示例为例:

concepts = [
    ["natural language processing", "text mining", "texts", "nlp"],
    ["advanced data mining", "data mining", "data"],
    ["discourse analysis", "learning analytics", "mooc"],
]

正如您所说,每个概念元素都必须映射到第一个元素,因此,在Pythonish中大致按以下方式进行:

for concept in concepts:
    concept[1:] = concept[0]

如果所有概念的标记长度都等于1(这在这里并非如此),那么任务将变得容易,并且将是唯一的。让我们专注于第二种情况和一个特定的(稍加修改的)concept示例来阐述我的观点:

["advanced data mining", "data something", "data"]

在这里,data将被映射到高级数据挖掘但是data something,其中包含data,应该在其之前被映射。如果我理解您的意思正确,您想要这个句子:

"Here is data something and another data"
被映射到的内容:
"Here is advanced data mapping and another advanced data mining"

不要采用天真的方法:

"Here is advanced data mapping something and another advanced data mining"

注意,对于第二个示例,我们仅映射了 data 而不是 data something.

为了优先处理 data something(以及符合此模式的其他内容),我使用了一个填充有字典的数组结构,其中数组中较早的概念在标记方面更长。

继续我们的示例,这样的数组如下所示:

structure = [
    {"data": {"something": "advanced data mining"}},
    {"data": "advanced data mining"},
]

请注意,如果我们按照这个顺序(例如首先按照连续的标记遍历第一个字典,如果没有匹配,则转到第二个字典等等)进行标记化处理,我们将首先获得最长的概念。

2. 代码

好了,我希望你已经基本了解了这个想法(如果不是,请在下面发表评论,我会尝试更详细地解释不清楚的部分)。

声明:就代码而言,我并不特别自豪,但它能够完成工作,并且可能还有更糟糕的情况。

2.1 分层字典

首先,让我们逐标记获取最长的概念(除了第一个元素,因为它是我们的目标,我们永远不需要更改它):

def get_longest(concepts: List[List[str]]):
    return max(len(text.split()) for concept in concepts for text in concept[1:])

使用这些信息,我们可以通过创建与不同概念长度相同的字典(在上面的示例中是2)来初始化我们的结构。对于您的所有数据,都应该如此。但是任何长度的概念都可以:

def init_hierarchical_dictionaries(longest: int):
    return [(length, {}) for length in reversed(range(longest))]

注意,我正在将每个概念的长度添加到数组中。在我的看法中,这样更容易遍历,尽管在实现上进行一些更改后,你也可以不这样做。

现在,当我们拥有了这些辅助函数之后,我们可以从概念列表创建结构:

def create_hierarchical_dictionaries(concepts: List[List[str]]):
    # Initialization
    longest = get_longest(concepts)
    hierarchical_dictionaries = init_hierarchical_dictionaries(longest)

    for concept in concepts:
        for text in concept[1:]:
            tokens = text.split()
            # Initialize dictionary; get the one with corresponding length.
            # The longer, the earlier it is in the hierarchy
            current_dictionary = hierarchical_dictionaries[longest - len(tokens)][1]
            # All of the tokens except the last one are another dictionary mapping to
            # the next token in concept.
            for token in tokens[:-1]:
                current_dictionary[token] = {}
                current_dictionary = current_dictionary[token]

            # Last token is mapped to the first concept
            current_dictionary[tokens[-1]] = concept[0].split()

    return hierarchical_dictionaries

这个函数将创建我们的分层字典, 有关说明请参阅源代码中的注释。您可能希望创建一个自定义类来保留此内容,以便更容易使用。

这个对象与1.介绍中描述的完全相同。

2.2 遍历字典

这部分比较困难,但这次我们采用自上而下的方法。我们会从简单的开始:

def embed_sentences(sentences: List[str], hierarchical_dictionaries):
    return (traverse(sentence, hierarchical_dictionaries) for sentence in sentences)

提供分层字典,它创建一个生成器,根据概念映射转换每个句子。

现在是traverse函数:

def traverse(sentence: str, hierarchical_dictionaries):
    # Get all tokens in the sentence
    tokens = sentence.split()
    output_sentence = []
    # Initialize index to the first token
    index = 0
    # Until any tokens left to check for concepts
    while index < len(tokens):
        # Iterate over hierarchical dictionaries (elements of the array)
        for hierarchical_dictionary_tuple in hierarchical_dictionaries:
            # New index is returned based on match and token-wise length of concept
            index, concept = traverse_through_dictionary(
                index, tokens, hierarchical_dictionary_tuple
            )
            # Concept was found in current hierarchical_dictionary_tuple, let's add it
            # to output
            if concept is not None:
                output_sentence.extend(concept)
                # No need to check other hierarchical dictionaries for matching concept
                break
        # Token (and it's next tokens) do not match with any concept, return original
        else:
            output_sentence.append(tokens[index])
        # Increment index in order to move to the next token
        index += 1

    # Join list of tokens into a sentence
    return " ".join(output_sentence)

如果您不确定发生了什么,请再次发表评论

使用这种方法,悲观地说,我们将执行O(n*c!)次检查,其中n是句子中的标记数,c是最长概念的标记长度及其阶乘。在实践中,这种情况非常罕见,每个句子中的标记都几乎完全适合最长的概念,并且所有较短的概念都必须是最短一个的前缀(例如超级数据挖掘超级数据数据)。

对于任何实际问题来说,它很可能更接近O(n),如我先前所说,使用您在.txt文件中提供的数据,最坏情况为O(3*n),通常为O(2*n)。

遍历每个字典:

def traverse_through_dictionary(index, tokens, hierarchical_dictionary_tuple):
    # Get the level of nested dictionaries and initial dictionary
    length, current_dictionary = hierarchical_dictionary_tuple
    # inner_index will loop through tokens until match or no match was found
    inner_index = index
    for _ in range(length):
        # Get next nested dictionary and move inner_index to the next token
        current_dictionary = current_dictionary.get(tokens[inner_index])
        inner_index += 1
        # If no match was found in any level of dictionary
        # Return current index in sentence and None representing lack of concept.
        if current_dictionary is None or inner_index >= len(tokens):
            return index, None

    # If everything went fine through all nested dictionaries, check whether
    # last token corresponds to concept
    concept = current_dictionary.get(tokens[inner_index])
    if concept is None:
        return index, None
    # If so, return inner_index (we have moved length tokens, so we have to update it)
    return inner_index, concept

这是我的解决方案的核心部分。

3. 结果

现在,为了简洁起见,整个源代码如下所示(concepts.txt 是您提供的内容):

import ast
import time
from typing import List


def get_longest(concepts: List[List[str]]):
    return max(len(text.split()) for concept in concepts for text in concept[1:])


def init_hierarchical_dictionaries(longest: int):
    return [(length, {}) for length in reversed(range(longest))]


def create_hierarchical_dictionaries(concepts: List[List[str]]):
    # Initialization
    longest = get_longest(concepts)
    hierarchical_dictionaries = init_hierarchical_dictionaries(longest)

    for concept in concepts:
        for text in concept[1:]:
            tokens = text.split()
            # Initialize dictionary; get the one with corresponding length.
            # The longer, the earlier it is in the hierarchy
            current_dictionary = hierarchical_dictionaries[longest - len(tokens)][1]
            # All of the tokens except the last one are another dictionary mapping to
            # the next token in concept.
            for token in tokens[:-1]:
                current_dictionary[token] = {}
                current_dictionary = current_dictionary[token]

            # Last token is mapped to the first concept
            current_dictionary[tokens[-1]] = concept[0].split()

    return hierarchical_dictionaries


def traverse_through_dictionary(index, tokens, hierarchical_dictionary_tuple):
    # Get the level of nested dictionaries and initial dictionary
    length, current_dictionary = hierarchical_dictionary_tuple
    # inner_index will loop through tokens until match or no match was found
    inner_index = index
    for _ in range(length):
        # Get next nested dictionary and move inner_index to the next token
        current_dictionary = current_dictionary.get(tokens[inner_index])
        inner_index += 1
        # If no match was found in any level of dictionary
        # Return current index in sentence and None representing lack of concept.
        if current_dictionary is None or inner_index >= len(tokens):
            return index, None

    # If everything went fine through all nested dictionaries, check whether
    # last token corresponds to concept
    concept = current_dictionary.get(tokens[inner_index])
    if concept is None:
        return index, None
    # If so, return inner_index (we have moved length tokens, so we have to update it)
    return inner_index, concept


def traverse(sentence: str, hierarchical_dictionaries):
    # Get all tokens in the sentence
    tokens = sentence.split()
    output_sentence = []
    # Initialize index to the first token
    index = 0
    # Until any tokens left to check for concepts
    while index < len(tokens):
        # Iterate over hierarchical dictionaries (elements of the array)
        for hierarchical_dictionary_tuple in hierarchical_dictionaries:
            # New index is returned based on match and token-wise length of concept
            index, concept = traverse_through_dictionary(
                index, tokens, hierarchical_dictionary_tuple
            )
            # Concept was found in current hierarchical_dictionary_tuple, let's add it
            # to output
            if concept is not None:
                output_sentence.extend(concept)
                # No need to check other hierarchical dictionaries for matching concept
                break
        # Token (and it's next tokens) do not match with any concept, return original
        else:
            output_sentence.append(tokens[index])
        # Increment index in order to move to the next token
        index += 1

    # Join list of tokens into a sentence
    return " ".join(output_sentence)


def embed_sentences(sentences: List[str], hierarchical_dictionaries):
    return (traverse(sentence, hierarchical_dictionaries) for sentence in sentences)


def sanity_check():
    concepts = [
        ["natural language processing", "text mining", "texts", "nlp"],
        ["advanced data mining", "data mining", "data"],
        ["discourse analysis", "learning analytics", "mooc"],
    ]
    sentences = [
        "data mining and text mining",
        "nlp is mainly used by discourse analysis community",
        "data mining in python is fun",
        "mooc data analysis involves texts",
        "data and data mining are both very interesting",
    ]

    targets = [
        "advanced data mining and natural language processing",
        "natural language processing is mainly used by discourse analysis community",
        "advanced data mining in python is fun",
        "discourse analysis advanced data mining analysis involves natural language processing",
        "advanced data mining and advanced data mining are both very interesting",
    ]

    hierarchical_dictionaries = create_hierarchical_dictionaries(concepts)

    results = list(embed_sentences(sentences, hierarchical_dictionaries))
    if results == targets:
        print("Correct results")
    else:
        print("Incorrect results")


def speed_check():
    with open("./concepts.txt") as f:
        concepts = ast.literal_eval(f.read())

    initial_sentences = [
        "data mining and text mining",
        "nlp is mainly used by discourse analysis community",
        "data mining in python is fun",
        "mooc data analysis involves texts",
        "data and data mining are both very interesting",
    ]

    sentences = initial_sentences.copy()

    for i in range(1_000_000):
        sentences += initial_sentences

    start = time.time()
    hierarchical_dictionaries = create_hierarchical_dictionaries(concepts)
    middle = time.time()
    letters = []
    for result in embed_sentences(sentences, hierarchical_dictionaries):
        letters.append(result[0].capitalize())
    end = time.time()
    print(f"Time for hierarchical creation {(middle-start) * 1000.0} ms")
    print(f"Time for embedding {(end-middle) * 1000.0} ms")
    print(f"Overall time elapsed {(end-start) * 1000.0} ms")


def main():
    sanity_check()
    speed_check()


if __name__ == "__main__":
    main()

下面提供了速度检测的结果:

Time for hierarchical creation 107.71822929382324 ms
Time for embedding 30460.427284240723 ms
Overall time elapsed 30568.145513534546 ms

因此,对于500万个句子(将您提供的5个句子连接起来重复1百万次),以及您提供的概念文件(1.1 mb),执行概念映射大约需要30秒钟,我想这还不错。

字典应该最多占用与输入文件相同的内存(在本例中是concepts.txt),但通常会更低/远低,因为它取决于概念长度和那些单词的唯一性。


1
哇,这太令人印象深刻了。我会在我的真实数据集上运行这段代码,并让您知道它的表现如何。感谢您提供如此好的答案。 - EmJ
1
再次感谢您提供的非常好的答案。当我在我的真实数据集上运行您的代码时,出现了以下错误:possible_dictionary = current_dictionary.get(tokens[inner_index]) IndexError: list index out of range。有没有办法解决这个问题?如果需要,我很乐意为您提供更多细节。期待您的回复。 - EmJ
1
我在此附上一个样例概念列表(链接:https://drive.google.com/file/d/1U2gT0umy-iFdP1G5ELkY1veMZJmtLEC8/view?usp=sharing)。对于句子“world wide web www”,我遇到了错误。我猜问题出在“www”上。期待您的回复。谢谢 :) - EmJ
1
抱歉,我之前没有时间处理它,但现在应该已经修复了,因为我已经更新了我的答案中的“traverse_through_dictionary”函数。无论如何,对于你的情况,性能如何? - Szymon Maszke
1
非常感谢你的更新,我真的非常欣赏。更新已经修复了这个问题。现在我可以在大约5分钟内运行整个数据集(这相当令人印象深刻)。我强烈推荐这个解决方案给任何阅读我的文章的人。再次感谢你 :) - EmJ
显示剩余3条评论

5

使用后缀数组方法,

如果您的数据已经过清洗,请跳过此步骤。

首先,对数据进行清洗,将所有空格字符替换为任何您知道不会成为任何概念或句子一部分的字符。

然后为所有句子构建后缀数组。每个句子需要O(nLogn)的时间。有一些算法可以使用后缀树在O(n)的时间内完成此操作。

一旦您已准备好所有句子的后缀数组,只需对您的概念执行二进制搜索即可。

您可以使用LCP数组进一步优化搜索。请参考:kasai's

使用LCP和后缀数组,可以将搜索的时间复杂度降至O(n)。

编辑:此方法通常用于基因组的序列比对,并且非常流行。您应该很容易找到适合您的实现。


1
为所有句子构建后缀数组。是为每个句子创建一个数组还是为所有句子创建一个数组?在清理句子时,结果是一个单独的字符串,其中使用占位符代替句子内部的空格,并使用不同的占位符代替句子边界吗? - wwii
每个句子都有一个数组。原因是OP想在每个句子中找到每个概念。关于您的第二步,我真的不喜欢调整空格。净化器只是为了清晰。所以,是的,它将使用占位符作为单个字符串。我没有考虑用边界占位符合并所有句子。因为即使我们合并所有句子,后缀仍然按顺序出现,这样我们可以进行第二次二进制搜索,以在合并的句子中找到所有出现。所以,也许合并是更好的解决方案。谢谢您指出。 - HariUserX

2
import re
concepts = [['natural language processing', 'text mining', 'texts', 'nlp'], ['advanced data mining', 'data mining', 'data'], ['discourse analysis', 'learning analytics', 'mooc']]
sentences = ['data mining and text mining', 'nlp is mainly used by discourse analysis community', 'data mining in python is fun', 'mooc data analysis involves texts', 'data and data mining are both very interesting']

replacementDict = {concept[0] : concept[1:] for concept in concepts}

finderAndReplacements = [(re.compile('(' + '|'.join(replacees) + ')'), replacement) 
for replacement, replacees in replacementDict.items()]

def sentenceReplaced(findRegEx, replacement, sentence):
    return findRegEx.sub(replacement, sentence, count=0)

def sentencesAllReplaced(sentences, finderAndReplacements=finderAndReplacements):
    for regex, replacement in finderAndReplacements:
        sentences = [sentenceReplaced(regex, replacement, sentence) for sentence in sentences]
    return sentences

print(sentencesAllReplaced(sentences))

设置:我更喜欢将“概念”转换为一个字典,其中键和值是替换和被替换的内容。将其存储在“replacementDict”中。
为每个预期替换组编译一个匹配的正则表达式。将其与预期的替换一起存储在“finderAndReplacements”列表中。
“sentenceReplaced”函数在执行替换后返回输入句子。(应用顺序无关紧要,因此如果我们注意避免竞争条件,可以进行并行化。)
最后,我们循环遍历每个句子并进行查找/替换。(大量并行结构将提供更好的性能。)
我希望能看到一些彻底的基准测试/测试/报告,因为我相信这个任务的输入(“概念”,“句子”)和运行它的硬件的性质会有很多微妙之处。
在句子是主要输入组件而概念替换相对较少的情况下,我认为编译正则表达式会更有优势。当句子很少而概念很多,尤其是大部分概念都不在任何句子中时,编译这些匹配器将是浪费。而且,如果每个替换都有很多可替换项,使用编译的方法可能性能不佳甚至出错……(对于输入参数的不同假设提供了许多权衡考虑,这通常是常见情况)。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接