如何加速在字节数组中查找模式

6
我有一个大约70MB大小的二进制文件。在我的程序中,我有一种方法,可以对字节数组模式与该文件进行比较,以查看它们是否存在于文件中。我需要运行1-10百万个这样的模式。以下是我可以考虑的选项:
  1. 通过使用 byte[] file = File.ReadAllBytes(path) 将文件读入内存,然后对字节数组模式与文件字节进行比较。我使用了许多不同主题上的方法来实现这一点,如: byte[] array pattern search Find an array (byte[]) inside another array? Best way to check if a byte[] is contained in another byte[] 但是,当源数据很大时,byte[]与byte[]之间的比较非常慢。在普通计算机上运行1百万个模式需要数周时间。
  2. 将文件和模式都转换为十六进制字符串,然后使用contains()方法进行比较。这比byte[]比较快,但是将字节转换为十六进制会导致内存中的文件变大,从而增加处理时间。
  3. 使用Encoding.GetEncoding(1252).GetBytes()将文件和模式都转换为字符串,然后执行查找。然后通过运行contains()匹配另一个执行byte[]查找的方法(建议选择第一种方法)来补偿二进制到字符串转换的限制(我知道它们是不兼容的)。这对我来说是最快的选项。
使用第三种方法,1百万个模式需要2/3天到1天的时间,具体取决于CPU。我需要了解如何加速查找。
谢谢。
编辑:感谢 @MySkullCaveIsADarkPlace,现在我有了第四个比上述三个方法更快的方法。我曾经使用过有限的byte[]查找算法,现在我正在使用略微比上述三种方法更快的MemoryExtensions.IndexOf() byte[]查找方法。尽管这个方法更快,但查找仍然很慢。1000个模式查找需要1分钟。
这些模式每个包含12-20字节。

1
“这个比byte[]查找快。” 这不应该是这样的,除非byte数组查找以低效的方式实现。基本上,在字符串中进行序数子字符串查找应该与在byte[]中进行子数组查找执行更多或更少相同,如果两者都实现得一样好。那么,您到底使用什么来进行byte[]查找?您是否使用https://learn.microsoft.com/en-us/dotnet/api/system.memoryextensions.indexof#system-memoryextensions-indexof-1(system-readonlyspan((-0))-system-readonlyspan((-0)))? - user19858830
1
考虑到在.NET中,char占16位,因此,当正确处理并且第一个匹配项位于相对于搜索空间的相同字节位置/内存地址时,将子字符串搜索与在长度为2 * string.Length的byte[]中搜索长度等于2 * substring.Length的子字节数组进行比较,应该可以看到非常相似的性能数据。 - user19858830
1
@TheodorZoulias 它们每个的大小在12-20字节之间。 - Tiano Manti
1
你对 Span<T>.IndexOf 的使用看起来很好。它的速度已经达到了极限(顺便说一下,它正在使用 SIMD 指令),除非你要搜索的字节数组遵循某种模式或结构,可以通过制定专门的搜索算法来利用这些模式和结构... - user19858830
1
@TheodorZoulias,我不确定您所说的“hard limit”是什么意思,但如果您指的是可能的最大模式大小,那么是的。最大为20字节。 - Tiano Manti
显示剩余10条评论
2个回答

6
我假设你在一个接着一个地查找模式。也就是说,你正在文件的每个位置进行100万次的模式搜索!考虑改变方法。遍历一次文件字节,并确定当前位置是否为模式的起始位置。为了高效实现此操作,建议将模式组织成一个数组的列表形式,每个模式存储在数组索引`256 * byte[0] + byte[1]`的列表中。对于拥有100万个模式的情况,你会在每个数组位置的列表中平均拥有152个模式。这样可以快速查询。
您还可以使用前3个字节(`256 *(256 * byte [0] + byte [1])+ byte [2]`),从而得到长度为256 ^ 3〜1600万的数组(我使用更长的数组;对于C#没有问题)。 然后,您将平均少于一个模式每个数组位置。这将导致几乎线性的搜索时间O(n),相对于文件长度而言,这是与简单算法的二次O(num_of_patterns * file_length)相比巨大的改进。我们可以使用简单的逐字节比较来比较模式,因为我们可以从已知位置开始比较。(Boyer Moore在此无用。)
2字节索引(模式必须至少为2个字节长)
byte[] file = { 23, 36, 43, 76, 125, 56, 34, 234, 12, 3, 5, 76, 8, 0, 6, 125, 234, 56, 211, 122, 22, 4, 7, 89, 76, 64, 12, 3, 5, 76, 8, 0, 6, 125 };
byte[][] patterns = {
    new byte[]{ 12, 3, 5, 76, 8, 0, 6, 125, 11 },
    new byte[]{ 211, 122, 22, 4 },
    new byte[]{ 17, 211, 5, 8 },
    new byte[]{ 22, 4, 7, 89, 76, 64 },
};
var patternMatrix = new List<byte[]>[256 * 256];

// Add patterns to matrix.
// We assume pattern.Length >= 2.
foreach (byte[] pattern in patterns) {
    int index = 256 * pattern[0] + pattern[1];
    patternMatrix[index] ??= new List<byte[]>(); // Ensure we have a list.
    patternMatrix[index].Add(pattern);
}

// The search. Loop through the file
for (int fileIndex = 0; fileIndex < file.Length - 1; fileIndex++) { // Length - 1 because we need 2 bytes.
    int patternIndex = 256 * file[fileIndex] + file[fileIndex + 1];
    List<byte[]> candiatePatterns = patternMatrix[patternIndex];
    if (candiatePatterns != null) {
        foreach (byte[] candidate in candiatePatterns) {
            if (fileIndex + candidate.Length <= file.Length) {
                bool found = true;
                // We know that the 2 first bytes are matching,
                // so let's start at the 3rd
                for (int i = 2; i < candidate.Length; i++) {
                    if (candidate[i] != file[fileIndex + i]) {                             
                        found = false;
                        break;
                    }
                }
                if (found) {
                    Console.WriteLine($"pattern {{{candidate[0]}, {candidate[1]} ..}} found at file index {fileIndex}");
                }
            }
        }
    }
}

同样的算法,只需3个字节即可(速度更快!)

3字节索引(模式必须至少为3个字节长)

var patternMatrix = new List<byte[]>[256 * 256 * 256];

// Add patterns to matrix.
// We assume pattern.Length >= 3.
foreach (byte[] pattern in patterns) {
    int index = 256 * 256 * pattern[0] + 256 * pattern[1] + pattern[2];
    patternMatrix[index] ??= new List<byte[]>(); // Ensure we have a list.
    patternMatrix[index].Add(pattern);
}

// The search. Loop through the file
for (int fileIndex = 0; fileIndex < file.Length - 2; fileIndex++) { // Length - 2 because we need 3 bytes.
    int patternIndex = 256 * 256 * file[fileIndex] + 256 * file[fileIndex + 1] + file[fileIndex + 2];
    List<byte[]> candiatePatterns = patternMatrix[patternIndex];
    if (candiatePatterns != null) {
        foreach (byte[] candidate in candiatePatterns) {
            if (fileIndex + candidate.Length <= file.Length) {
                bool found = true;
                // We know that the 3 first bytes are matching,
                // so let's start at the 4th
                for (int i = 3; i < candidate.Length; i++) {
                    if (candidate[i] != file[fileIndex + i]) {
                        found = false;
                        break;
                    }
                }
                if (found) {
                    Console.WriteLine($"pattern {{{candidate[0]}, {candidate[1]} ..}} found at file index {fileIndex}");
                }
            }
        }
    }
}

为什么它速度更快?
简单的嵌套循环算法最多比较 ~ 70^6 * 10^6 = 7 * 10^14(700万亿)个模式!其中70^6是文件的长度,10^6是模式数量。
我的算法使用2字节索引,进行 ~ 70^6 * 152 = 10^10 次模式比较。数字152来自于每个2字节索引中平均有152个模式 ~ 10^6 / (256 * 256)。这样就快了65,536倍。
使用3字节,你将得到少于70^6个模式比较的结果,这是快了超过1000万倍。原因在于我们将所有的模式都存储在一个数组中,该数组的长度大于模式数(10百万或更少)。因此,在文件中任何一个字节位置加上后面的两个位置,我们只能选取以相同3字节开头的模式。这平均只有不到一个模式。有时可能是0或1个,有时是2或3个,但在任何数组位置上很少有更多的模式。
试一试吧。移位从 O(n^2) 到接近 O(n)。初始化时间为 O(n)。假设模式的前2或3个字节比较随机分布。如果不是这样,我的算法在最坏情况下将退化为 O(n^2)
好了,这是理论。由于3字节索引版本在初始化时较慢,它可能只在处理大型数据集时有优势。使用 Span<byte> 可以进行其他改进。
参见:大O标记法 - 维基百科

谢谢您的回复。不过,我感觉我做错了什么。我已经用200k个模式测试了算法,只用了大约1分钟就完成了。请注意,算法背后的逻辑超出了我的理解范围。以下是我使用您的算法的方式,请告诉我是否出了问题。https://codeshare.io/VZqvg8 - Tiano Manti
我已经用几个随机放置在文件中的模式测试了算法。200k个样本,大约需要1分钟就能完成。我简直不敢相信这个算法有多快。我还在自我怀疑。我会确保一切都正常运作,然后再接受这个答案。 - Tiano Manti
1
比看起来简单。我将模式存储在一个巨大的数组patternMatrix中。数组元素是模式列表(如果没有存储模式,则为null)。模式的开头(2或3个字节)成为此数组中的索引。然后,我循环遍历文件字节。在每个文件位置,我取当前字节加上接下来的1或2个字节(取决于是否使用2或3个字节索引)再次计算索引。然后,我不是比较所有1000万个模式,而是仅比较patternMatrix中此数组索引处的模式。 - Olivier Jacot-Descombes
2
@TheodorZoulias,我在我的回答末尾提到了这个限制。感谢您的指出。 - Olivier Jacot-Descombes
1
没有问题,只需报告进度,例如fileIndex % 10000 == 0。我做了一点改进。我们可以从索引3或4开始比较模式,因为我们知道前2或3个字节是相等的。在执行此操作时,奇怪的是2字节版本比3字节版本更快。可能是因为3字节版本的初始化速度较慢。这使2字节版本的性能提高了超过35%。 - Olivier Jacot-Descombes
显示剩余8条评论

1

一种想法是按照长度将模式分组,将每个组放入HashSet<byte[]>中以实现O(1)复杂度的搜索,然后逐个索引扫描源byte[]以获取所有组。由于在您的情况下组数很少(仅有9个组),因此此优化应该能够显著提高性能。以下是实现:

IEnumerable<byte[]> FindMatches(byte[] source, byte[][] patterns)
{
    Dictionary<int, HashSet<ArraySegment<byte>>> buckets = new();
    ArraySegmentComparer comparer = new();
    foreach (byte[] pattern in patterns)
    {
        HashSet<ArraySegment<byte>> bucket;
        if (!buckets.TryGetValue(pattern.Length, out bucket))
        {
            bucket = new(comparer);
            buckets.Add(pattern.Length, bucket);
        }
        bucket.Add(pattern); // Implicit cast byte[] => ArraySegment<byte> 
    }
    for (int i = 0; i < source.Length; i++)
    {
        foreach (var (length, bucket) in buckets)
        {
            if (i + length > source.Length) continue;
            ArraySegment<byte> slice = new(source, i, length);
            if (bucket.TryGetValue(slice, out var pattern))
            {
                yield return pattern.Array;
                bucket.Remove(slice);
            }
        }
    }
}

目前(.NET 6)标准库中没有可用于序列的相等比较器,因此您需要提供自定义比较器:

class ArraySegmentComparer : IEqualityComparer<ArraySegment<byte>>
{
    public bool Equals(ArraySegment<byte> x, ArraySegment<byte> y)
    {
        return x.AsSpan().SequenceEqual(y);
    }

    public int GetHashCode(ArraySegment<byte> obj)
    {
        HashCode hashcode = new();
        hashcode.AddBytes(obj);
        return hashcode.ToHashCode();
    }
}

该算法假设模式中没有重复项。如果有重复项,则只会发出其中一个重复项。
在我的(不是很快的)PC上,该算法需要约10秒钟来创建大小为12-20的10,000,000个模式的“buckets”字典,然后需要额外的5-6分钟来扫描大小为70,000,000的源byte[](大约每秒扫描200,000个字节)。模式数量不会影响扫描阶段(只要组数不增加)。
并行化此算法并不容易,因为“buckets”在扫描期间被改变。

谢谢回复。虽然我只给了算法10个模式,等了11分钟,但方法还没有完成。 - Tiano Manti
@Tiano Manti,很奇怪。您可以尝试在“for”循环内添加以下行:if (i % 100000 == 0) Console.WriteLine(i);,这样您就可以看到进展的视觉指示了。 - Theodor Zoulias
@TianoManti 顺便提醒一下,确保以发布模式运行程序。在调试模式下运行会变慢。 - Theodor Zoulias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接