我有一个很长的字符串,比如说"aaaaaaaaaaabbbbbbbbbcccccccccccddddddddddd"(但可能更长),还有很多小字符串的集合。我想计算(可以重叠)这些小字符串在大字符串中出现的次数。我只关心速度。KMP算法似乎不错,但它处理多个字符串时速度较慢,而Rabin-Karp算法可以解决这个问题,但速度较慢。
我有一个很长的字符串,比如说"aaaaaaaaaaabbbbbbbbbcccccccccccddddddddddd"(但可能更长),还有很多小字符串的集合。我想计算(可以重叠)这些小字符串在大字符串中出现的次数。我只关心速度。KMP算法似乎不错,但它处理多个字符串时速度较慢,而Rabin-Karp算法可以解决这个问题,但速度较慢。
from collections import defaultdict
class SuffixTree:
def __init__(self):
"""Returns an empty suffix tree"""
self.T=''
self.E={}
self.nodes=[-1] # 0th node is empty string
def add(self,s):
"""Adds the input string to the suffix tree.
This inserts all substrings into the tree.
End the string with a unique character if you want a leaf-node for every suffix.
Produces an edge graph keyed by (node,character) that gives (first,last,end)
This means that the edge has characters from T[first:last+1] and goes to node end."""
origin,first,last = 0,len(self.T),len(self.T)-1
self.T+=s
nc = len(self.nodes)
self.nodes += [-1]*(2*len(s))
T=self.T
E=self.E
nodes=self.nodes
Lm1=len(T)-1
for last_char_index in xrange(first,len(T)):
c=T[last_char_index]
last_parent_node = -1
while 1:
parent_node = origin
if first>last:
if (origin,c) in E:
break
else:
key = origin,T[first]
edge_first, edge_last, edge_end = E[key]
span = last - first
A = edge_first+span
m = T[A+1]
if m==c:
break
E[key] = (edge_first, A, nc)
nodes[nc] = origin
E[nc,m] = (A+1,edge_last,edge_end)
parent_node = nc
nc+=1
E[parent_node,c] = (last_char_index, Lm1, nc)
nc+=1
if last_parent_node>0:
nodes[last_parent_node] = parent_node
last_parent_node = parent_node
if origin==0:
first+=1
else:
origin = nodes[origin]
if first <= last:
edge_first,edge_last,edge_end=E[origin,T[first]]
span = edge_last-edge_first
while span <= last - first:
first+=span+1
origin = edge_end
if first <= last:
edge_first,edge_last,edge_end = E[origin,T[first]]
span = edge_last - edge_first
if last_parent_node>0:
nodes[last_parent_node] = parent_node
last+=1
if first <= last:
edge_first,edge_last,edge_end=E[origin,T[first]]
span = edge_last-edge_first
while span <= last - first:
first+=span+1
origin = edge_end
if first <= last:
edge_first,edge_last,edge_end = E[origin,T[first]]
span = edge_last - edge_first
return self
def make_choices(self):
"""Construct a sorted list for each node of the possible continuing characters"""
choices = [list() for n in xrange(len(self.nodes))] # Contains set of choices for each node
for (origin,c),edge in self.E.items():
choices[origin].append(c)
choices=[sorted(s) for s in choices] # should not have any repeats by construction
self.choices=choices
return choices
def count_suffixes(self,term):
"""Recurses through the tree finding how many suffixes are based at each node.
Strings assumed to use term as the terminating character"""
C = self.suffix_counts = [0]*len(self.nodes)
choices = self.make_choices()
def f(node=0):
t=0
X=choices[node]
if len(X)==0:
t+=1 # this node is a leaf node
else:
for c in X:
if c==term:
t+=1
continue
first,last,end = self.E[node,c]
t+=f(end)
C[node]=t
return t
return f()
def count_matches(self,needle):
"""Return the count of matches for this needle in the suffix tree"""
i=0
node=0
E=self.E
T=self.T
while i<len(needle):
c=needle[i]
key=node,c
if key not in E:
return 0
first,last,node = E[key]
while i<len(needle) and first<=last:
if needle[i]!=T[first]:
return 0
i+=1
first+=1
return self.suffix_counts[node]
big="aaaaaaaaaaabbbbbbbbbcccccccccccddddddddddd"
small_strings=["a","ab","abc"]
s=SuffixTree()
term=chr(0)
s.add(big+term)
s.count_suffixes(term)
for needle in small_strings:
x=s.count_matches(needle)
print needle,'has',x,'matches'
它会打印出:
a has 11 matches
ab has 1 matches
abc has 0 matches
然而,实践中我建议您仅使用现有的Aho-Corasick实现,因为我预计在您的特定情况下这将会更快。
在大量字符串中进行匹配听起来像是http://en.wikipedia.org/wiki/Aho%E2%80%93Corasick_string_matching_algorithm。它一次只能找到一个匹配项,因此如果有大量的匹配项,Peter de Rivaz的想法可能更好。另一方面,Aho-Corasick不需要将大字符串保留在内存中-您只需通过流即可-并且非常实用和易于实现和调整-维基百科链接指出,原始的fgrep使用了它。
思考一下,你可以解决超级匹配问题。Aho-Corasick 可以被视为创建一个确定的有限状态机,只能识别它正在搜索的每个字符串。机器的状态对应于最后看到的 N 个字符。如果您想匹配两个字符串,并且其中一个是另一个的后缀,则需要小心处理当您处于状态时,表示您刚刚匹配了更长的字符串时,您也要认识到这意味着您刚刚匹配了较短的字符串。如果您故意选择不这样做,则为较短的字符串累积的计数将不正确 - 但您知道它们比更长的字符串出现的次数低。因此,如果您修改 Aho-Corasick,使其仅识别和计算每个点上的最长匹配项,则匹配成本仍然与您正在搜索的字符串中的字符数成线性关系,并且您可以通过查看长字符串并递增后缀为长字符串的短字符串的计数来修复计数。这将花费的时间最多为正在搜索的字符串的总大小的线性时间。1)我会选择有限状态自动机。目前想不到一个专门的库,但通用的 PCRE 可以用于构建高效搜索给定子字符串的自动机。对于子字符串 "foo" 和 "bar",可以构造模式 /(foo)|(bar)/,扫描字符串并通过迭代 ovector 检查哪个组匹配来获取匹配子字符串的 "id" 编号。
如果您只需要总数而不是按子字符串分组,则 RE2::FindAndConsume 是一个好选择。
P.S. 以下是使用 Boost.Xpressive 并从映射中加载字符串的示例:http://ericniebler.com/2010/09/27/boost-xpressive-ftw/
P.P.S. 最近我为类似任务创建了一个 Ragel 机器,并且过程非常愉快。对于一小组搜索字符串,"正常" DFA 就足够了,但如果您有更大的规则集,则使用 Ragel 扫描器会显示出色的结果(这里是一个 相关答案)。
P.P.P.S. PCRE 具有超级有用的 MARK
关键字,可用于此类子模式分类(cf)。
2) 相当长一段时间以前,我用Scala编写了一个基于Trie的东西来处理这种负载:https://gist.github.com/ArtemGr/6150594;Trie.search在字符串上进行匹配,尝试将当前位置与Trie中编码的数字匹配。Trie被编码为单个缓存友好数组,我希望它能像非JIT DFA一样好。
3) 我一直在使用boost::spirit进行子字符串匹配,但从未测量过它与其他方法相比如何表现。Spirit使用一些有效的结构来进行符号匹配,也许可以将该结构单独使用,而无需Spirit的开销。
#include <boost/spirit/include/qi.hpp>
#include <boost/spirit/include/phoenix.hpp>
using qi::lit; using qi::alnum; using qi::digit; using qi::_val; using qi::_1; using boost::phoenix::ref;
static struct exact_t: qi::symbols<char, int> {
exact_t() {add
("foo", 1)
("bar", 2);}
} exact;
int32_t match = -1;
qi::rule<const char*, int()> rule =
+alnum >> exact [ref (match) = _1];
const char* it = haystack; // Mutable iterator for Spirit.
qi::parse (it, haystack + haystackLen, rule);
如果我理解正确,您的输入字符串由许多单字符块组成。
在这种情况下,您可以使用Run-length encoding压缩文本。
例如:
s = aaabbbbcc
被编码为:
encoded_s = (a3)(b4)(c2)
在另一个答案的基础上,(并希望说服您这是最好的类型答案),您可以查看http://en.wikipedia.org/wiki/Suffix_tree并通过其中列出的参考资料了解后缀树,以便了解如何获得匹配数而无需遍历所有匹配项,您所获得的运行时间和内存要求是任何子字符串匹配或匹配计数算法的最佳选择。一旦您理解了后缀树的工作原理以及如何构建/使用它,那么您需要的唯一额外调整就是在树的每个内部节点处存储正在表示的不同字符串起始位置的数量,这是一个小修改,您可以通过递归地从子节点获取计数并将它们相加来高效地进行(如已声明的)线性时间,然后这些计数可以让您计算子字符串匹配,而不必遍历所有匹配项。