如何加快二进制文件的反向扫描速度?

27

我有一个二进制文件规范,描述了一种分组数据结构。每个数据包都有一个两字节的同步模式,因此可以使用BinaryReaderFileStream组合进行分组的开始扫描:

while(!reader.EndOfFile)
{
    // Check for sync pattern.
    if (reader.ReadUInt16() != 0xEB25)
    {
        // Move to next byte.
        reader.BaseStream.Seek(-1, SeekOrigin.Current);
        continue;
    }

    // If we got here, a sync pattern was found.
}

这个过程在正向方向上运行得非常好,但是在反向扫描中进行类似的代码扫描至少慢了两个数量级:

while(!reader.BeginningOfFile)
{
    // Check for sync pattern.
    if (reader.ReadUInt16() != 0xEB25)
    {
        // Move to previous byte.
        reader.BaseStream.Seek(-3, SeekOrigin.Current);
        continue;
    }

    // If we got here, a sync pattern was found.
}

我尝试了几种解决方法,比如向后移动一定量(目前是1兆字节)并向前扫描,但显然我真正需要的是一个经过修改,读取方向既能向前又能向后拥有足够性能特征的BinaryReader或者FileStream

我已经有了一个FastFileStream,通过子类化普通的FileStream并缓存PositionLength属性改善了向前读取性能(它还提供了BeginningOfFileEndOfFile属性)。这就是上面代码中reader变量的驱动程序。

是否有类似的方法可以改善反向读取性能,可能通过将MemoryStream作为缓冲区来实现?


这个过程在正向方向上完美地工作。但它也很糟糕。读取两个字节,如果不是EB25,则回溯一个字节。 - L.B
你在这里可能不会有太多的好运。所有层面(库、操作系统、硬件)都是为正向读取进行优化的。你采取先大步后扫描前进的方法似乎是合理的。 - H H
2
然后我会尝试使用内存映射文件 - L.B
1
好的,你可以使用 FastForwardReverseFileStream。只是需要注意的是,使用 Memory Mapped file 并不意味着你要将整个文件内容加载到内存中。 - L.B
1
为什么不使用BufferedStream而使用FileStream?向前或向后移动并没有太大区别,因为BufferedStream将以块的形式读取文件,并且以正向或反向扫描块也不会有太大区别。即使是普通的FileStream,您也可以读取一个字节缓冲块并以相反的顺序读取它,在简单的for循环中通过递减索引实现。FileStream无论如何都使用4KB缓冲区,但它被优化用于正向读取。 - Akash Kava
显示剩余5条评论
2个回答

23

编辑:好的,我得到了一些代码。实际上是相当多的代码。它允许你向前和向后扫描数据包头。

我不能保证它没有漏洞,而且你肯定想调整缓冲区大小以查看其表现... 但是,给定你发送给我的同一文件,它至少在向前和向后扫描时显示出相同的数据包头位置 :)

在提供代码之前,我仍然建议如果可能的话,通过文件进行一次扫描并保存数据包信息的索引以备后用可能是更好的方法。

无论如何,这里是代码(包括除示例程序外没有测试):

PacketHeader.cs:

using System;

namespace Chapter10Reader
{
    public sealed class PacketHeader
    {
        private readonly long filePosition;
        private readonly ushort channelId;
        private readonly uint packetLength;
        private readonly uint dataLength;
        private readonly byte dataTypeVersion;
        private readonly byte sequenceNumber;
        private readonly byte packetFlags;
        private readonly byte dataType;
        private readonly ulong relativeTimeCounter;

        public long FilePosition { get { return filePosition; } }
        public ushort ChannelId { get { return channelId; } }
        public uint PacketLength { get { return packetLength; } }
        public uint DataLength { get { return dataLength; } }
        public byte DataTypeVersion { get { return dataTypeVersion; } }
        public byte SequenceNumber { get { return sequenceNumber; } }
        public byte PacketFlags { get { return packetFlags; } }
        public byte DataType { get { return dataType; } }
        public ulong RelativeTimeCounter { get { return relativeTimeCounter; } }

        public PacketHeader(ushort channelId, uint packetLength, uint dataLength, byte dataTypeVersion,
            byte sequenceNumber, byte packetFlags, byte dataType, ulong relativeTimeCounter, long filePosition)
        {
            this.channelId = channelId;
            this.packetLength = packetLength;
            this.dataLength = dataLength;
            this.dataTypeVersion = dataTypeVersion;
            this.sequenceNumber = sequenceNumber;
            this.packetFlags = packetFlags;
            this.dataType = dataType;
            this.relativeTimeCounter = relativeTimeCounter;
            this.filePosition = filePosition;
        }

        internal static PacketHeader Parse(byte[] data, int index, long filePosition)
        {
            if (index + 24 > data.Length)
            {
                throw new ArgumentException("Packet header must be 24 bytes long; not enough data");
            }
            ushort syncPattern = BitConverter.ToUInt16(data, index + 0);
            if (syncPattern != 0xeb25)
            {
                throw new ArgumentException("Packet header must start with the sync pattern");
            }
            ushort channelId = BitConverter.ToUInt16(data, index + 2);
            uint packetLength = BitConverter.ToUInt32(data, index + 4);
            uint dataLength = BitConverter.ToUInt32(data, index + 8);
            byte dataTypeVersion = data[index + 12];
            byte sequenceNumber = data[index + 13];
            byte packetFlags = data[index + 14];
            byte dataType = data[index + 15];
            // TODO: Validate this...
            ulong relativeTimeCounter =
                (ulong)BitConverter.ToUInt32(data, index + 16) +
                ((ulong)BitConverter.ToUInt16(data, index + 20)) << 32;
            // Assume we've already validated the checksum...
            return new PacketHeader(channelId, packetLength, dataLength, dataTypeVersion, sequenceNumber,
                packetFlags, dataType, relativeTimeCounter, filePosition);
        }

        /// <summary>
        /// Checks a packet header's checksum to see whether this *looks* like a packet header.
        /// </summary>
        internal static bool CheckPacketHeaderChecksum(byte[] data, int index)
        {
            if (index + 24 > data.Length)
            {
                throw new ArgumentException("Packet header must is 24 bytes long; not enough data");
            }
            ushort computed = 0;
            for (int i = 0; i < 11; i++)
            {
                computed += BitConverter.ToUInt16(data, index + i * 2);
            }
            return computed == BitConverter.ToUInt16(data, index + 22);
        }
    }
}

PacketScanner.cs:

using System;
using System.Diagnostics;
using System.IO;

namespace Chapter10Reader
{
    public sealed class PacketScanner : IDisposable
    {
        // 128K buffer... tweak this.
        private const int BufferSize = 1024 * 128;

        /// <summary>
        /// Where in the file does the buffer start?
        /// </summary>
        private long bufferStart;

        /// <summary>
        /// Where in the file does the buffer end (exclusive)?
        /// </summary>
        private long bufferEnd;

        /// <summary>
        /// Where are we in the file, logically?
        /// </summary>
        private long logicalPosition;

        // Probably cached by FileStream, but we use it a lot, so let's
        // not risk it...
        private readonly long fileLength;

        private readonly FileStream stream;
        private readonly byte[] buffer = new byte[BufferSize];        

        private PacketScanner(FileStream stream)
        {
            this.stream = stream;
            this.fileLength = stream.Length;
        }

        public void MoveToEnd()
        {
            logicalPosition = fileLength;
            bufferStart = -1; // Invalidate buffer
            bufferEnd = -1;
        }

        public void MoveToBeforeStart()
        {
            logicalPosition = -1;
            bufferStart = -1;
            bufferEnd = -1;
        }

        private byte this[long position]
        {
            get 
            {
                if (position < bufferStart || position >= bufferEnd)
                {
                    FillBuffer(position);
                }
                return buffer[position - bufferStart];
            }
        }

        /// <summary>
        /// Fill the buffer to include the given position.
        /// If the position is earlier than the buffer, assume we're reading backwards
        /// and make position one before the end of the buffer.
        /// If the position is later than the buffer, assume we're reading forwards
        /// and make position the start of the buffer.
        /// If the buffer is invalid, make position the start of the buffer.
        /// </summary>
        private void FillBuffer(long position)
        {
            long newStart;
            if (position > bufferStart)
            {
                newStart = position;
            }
            else
            {
                // Keep position *and position + 1* to avoid swapping back and forth too much
                newStart = Math.Max(0, position - buffer.Length + 2);
            }
            // Make position the start of the buffer.
            int bytesRead;
            int index = 0;
            stream.Position = newStart;
            while ((bytesRead = stream.Read(buffer, index, buffer.Length - index)) > 0)
            {
                index += bytesRead;
            }
            bufferStart = newStart;
            bufferEnd = bufferStart + index;
        }

        /// <summary>
        /// Make sure the buffer contains the given positions.
        /// 
        /// </summary>
        private void FillBuffer(long start, long end)
        {
            if (end - start > buffer.Length)
            {
                throw new ArgumentException("Buffer not big enough!");
            }
            if (end > fileLength)
            {
                throw new ArgumentException("Beyond end of file");
            }
            // Nothing to do.
            if (start >= bufferStart && end < bufferEnd)
            {
                return;
            }
            // TODO: Optimize this more to use whatever bits we've actually got.
            // (We're optimized for "we've got the start, get the end" but not the other way round.)
            if (start >= bufferStart)
            {
                // We've got the start, but not the end. Just shift things enough and read the end...
                int shiftAmount = (int) (end - bufferEnd);
                Buffer.BlockCopy(buffer, shiftAmount, buffer, 0, (int) (bufferEnd - bufferStart - shiftAmount));
                stream.Position = bufferEnd;
                int bytesRead;
                int index = (int)(bufferEnd - bufferStart - shiftAmount);
                while ((bytesRead = stream.Read(buffer, index, buffer.Length - index)) > 0)
                {
                    index += bytesRead;
                }
                bufferStart += shiftAmount;
                bufferEnd = bufferStart + index;
                return;
            }

            // Just fill the buffer starting from start...
            bufferStart = -1;
            bufferEnd = -1;
            FillBuffer(start);
        }

        /// <summary>
        /// Returns the header of the next packet, or null 
        /// if we've reached the end of the file.
        /// </summary>
        public PacketHeader NextHeader()
        {
            for (long tryPosition = logicalPosition + 1; tryPosition < fileLength - 23; tryPosition++)
            {
                if (this[tryPosition] == 0x25 && this[tryPosition + 1] == 0xEB)
                {
                    FillBuffer(tryPosition, tryPosition + 24);
                    int bufferPosition = (int) (tryPosition - bufferStart);
                    if (PacketHeader.CheckPacketHeaderChecksum(buffer, bufferPosition))
                    {
                        logicalPosition = tryPosition;
                        return PacketHeader.Parse(buffer, bufferPosition, tryPosition);
                    }
                }
            }
            logicalPosition = fileLength;
            return null;
        }

        /// <summary>
        /// Returns the header of the previous packet, or null 
        /// if we've reached the start of the file.
        /// </summary>
        public PacketHeader PreviousHeader()
        {
            for (long tryPosition = logicalPosition - 1; tryPosition >= 0; tryPosition--)
            {
                if (this[tryPosition + 1] == 0xEB && this[tryPosition] == 0x25)
                {
                    FillBuffer(tryPosition, tryPosition + 24);
                    int bufferPosition = (int)(tryPosition - bufferStart);
                    if (PacketHeader.CheckPacketHeaderChecksum(buffer, bufferPosition))
                    {
                        logicalPosition = tryPosition;
                        return PacketHeader.Parse(buffer, bufferPosition, tryPosition);
                    }
                }
            }
            logicalPosition = -1;
            return null;
        }

        public static PacketScanner OpenFile(string filename)
        {
            return new PacketScanner(File.OpenRead(filename));
        }

        public void Dispose()
        {
            stream.Dispose();
        }
    }
}

测试用Program.cs文件:

using System;
using System.Collections.Generic;
using System.Linq;

namespace Chapter10Reader
{
    class Program
    {
        static void Main(string[] args)
        {
            string filename = "test.ch10";

            Console.WriteLine("Forwards:");
            List<long> positionsForward = new List<long>();
            using (PacketScanner scanner = PacketScanner.OpenFile(filename))
            {
                scanner.MoveToBeforeStart();
                PacketHeader header;
                while ((header = scanner.NextHeader()) != null)
                {
                    Console.WriteLine("Found header at {0}", header.FilePosition);
                    positionsForward.Add(header.FilePosition);
                }
            }
            Console.WriteLine();
            Console.WriteLine("Backwards:");
            List<long> positionsBackward = new List<long>();
            using (PacketScanner scanner = PacketScanner.OpenFile(filename))
            {
                scanner.MoveToEnd();
                PacketHeader header;
                while ((header = scanner.PreviousHeader()) != null)
                {
                    positionsBackward.Add(header.FilePosition);
                }
            }
            positionsBackward.Reverse();
            foreach (var position in positionsBackward)
            {
                Console.WriteLine("Found header at {0}", position);
            }

            Console.WriteLine("Same? {0}", positionsForward.SequenceEqual(positionsBackward));
        }
    }
}

有一个数据包头,其中有一个字段指定了数据包的大小。我可以读取该数据包,或者使用大小跳到下一个数据包(如果它不是我想要的数据包)。这对于向前扫描来说非常好,前提是文件没有损坏,否则我必须退回到扫描同步模式。数据包的大小可达到半兆字节。如果您感兴趣,可以在此文档中详细查看数据包的规格说明,从10.6节开始。 - Robert Harvey
@RobertHarvey:谢谢,我今晚回家会看一下并尝试编写一些代码。你有一个样本数据文件可以让我玩吗? - Jon Skeet
数据包头存储了自身的简单校验和以及数据负载的校验和。验证数据包头校验和可以确定同步模式是否有效。这些文件非常大(以十几GB为单位);对每个文件进行预扫描以建立索引需要约30到60分钟。 - Robert Harvey
@RobertHarvey:是的,我知道。我的观点是,如果数据包恰好包含一个0xEB25字节序列,那么当将该部分数据解析为数据包时,它具有正确的校验和是相当不可能但并非不可能的。(而且真的 - 每个文件30分钟?在那段时间内你可以按顺序读取大量的数据。尤其是如果你实际上是要跳过数据部分。) - Jon Skeet
同步模式可以通过使用数据包长度向前移动并验证是否存在另一个同步模式来进一步确认。搜索操作需要在毫秒级别而不是分钟级别进行(它在UI中的“上一个”按钮上)。跳过每个数据包的数据负载可以将读取性能大约提高一倍,因此我的估计可能会偏差50%。 - Robert Harvey
显示剩余8条评论

16

L.B 在评论中提到使用内存映射文件,这可能会给您留下深刻的印象。

请尝试像这样做:

var memoryMapName = Path.GetFileName(fileToRead);

using (var mapStream = new FileStream(fileToRead, FileMode.Open))
{
    using (var myMap = MemoryMappedFile.CreateFromFile(
                            mapStream, 
                            memoryMapName, mapStream.Length,
                            MemoryMappedFileAccess.Read, null, 
                            HandleInheritability.None, false))
    {                    
        long leftToRead = mapStream.Length;
        long mapSize = Math.Min(1024 * 1024, mapStream.Length);
        long bytesRead = 0;
        long mapOffset = Math.Max(mapStream.Length - mapSize, 0);

        while (leftToRead > 1)
        {
            using (var FileMap = myMap.CreateViewAccessor(mapOffset, 
                                 mapSize, MemoryMappedFileAccess.Read))
            {
                long readAt = mapSize - 2;
                while (readAt > -1)
                {
                    var int16Read = FileMap.ReadUInt16(readAt);
                    //0xEB25  <--check int16Read here                            
                    bytesRead += 1;
                    readAt -= 1;
                }
            }

            leftToRead = mapStream.Length- bytesRead;
            mapOffset = Math.Max(mapOffset - mapSize, 0);
            mapSize = Math.Min(mapSize, leftToRead);
        }
    }
}

我已经为你的答案点了赞,因为我认为它很好,但请注意该问题有一个.NET 3.5标签,而我相信MemoryMappedFile仅在.NET 4.0中可用。 :) - Robert Harvey
公平地说,你在3.5中仍然可以调用映射文件API进行P/Invoke,所以如果这种方式有帮助,你仍然可以使用它,只需进行一些小的函数调用更改即可。 - Blindy
@RobertHarvey,您可以在3.5中实现它,但是您必须包装Win32函数 - Jetti
@RobertHarvey 啊,是的...我确实看到了3.5标签,但希望它不是绝对要求。我做了一些内存映射文件的测试,即使向前读取也比使用FileStream快得多。很抱歉这对您行不通。 - Eric Dahlvang

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接