使用最小的堆分配读取二进制格式的大型文件,并提取其中的文件。

8

对于标题感到抱歉,它可能有点令人困惑,但我不知道如何更好地解释它。

有两个扩展名为.cat(目录文件)和.dat的文件。 .cat文件包含.dat文件中二进制文件的信息。 此信息是文件的名称、大小、在.dat文件中的偏移量以及md5哈希。

例如.cat文件;

assets/textures/environments/asteroids/ast_crystal_blue_diff-small.gz 22387 1546955265 85a67a982194e4141e08fac4bf062c8f
assets/textures/environments/asteroids/ast_crystal_blue_diff.gz 83859 1546955265 86c7e940de82c2c2573a822c9efc9b6b
assets/textures/environments/asteroids/ast_crystal_diff-small.gz 22693 1546955265 cff6956c94b59e946b78419d9c90f972
assets/textures/environments/asteroids/ast_crystal_diff.gz 85531 1546955265 57d5a24dd4da673a42cbf0a3e8e08398
assets/textures/environments/asteroids/ast_crystal_green_diff-small.gz 22312 1546955265 857fea639e1af42282b015e8decb02db
assets/textures/environments/asteroids/ast_crystal_green_diff.gz 115569 1546955265 ee6f60b0a8211ec048172caa762d8a1a
assets/textures/environments/asteroids/ast_crystal_purple_diff-small.gz 14179 1546955265 632317951273252d516d36b80de7dfcd
assets/textures/environments/asteroids/ast_crystal_purple_diff.gz 53781 1546955265 c057acc06a4953ce6ea3c6588bbad743
assets/textures/environments/asteroids/ast_crystal_yellow_diff-small.gz 21966 1546955265 a893c12e696f9e5fb188409630b8d10b
assets/textures/environments/asteroids/ast_crystal_yellow_diff.gz 82471 1546955265 c50a5e59093fe9c6abb64f0f47a26e57
assets/textures/environments/asteroids/xen_crystal_diff-small.gz 14161 1546955265 23b34bdd1900a7e61a94751ae798e934
assets/textures/environments/asteroids/xen_crystal_diff.gz 53748 1546955265 dcb7c8294ef72137e7bca8dd8ea2525f
assets/textures/lensflares/lens_rays3_small_diff.gz 14107 1546955265 a656d1fad4198b0662a783919feb91a5

我相对容易地解析了那些文件,我使用了Span<T>,并经过一些BenchmarkDotNet的基准测试后,我认为我已经尽可能优化了这种类型文件的读取。

但是.dat文件就不同了。一个典型的.dat文件大小为GB级别。

我首先尝试了我能想到的最简单的方法。

(我删除了空值检查和验证代码,以使代码更易读。)

public async Task ExportAssetsAsync(CatalogFile catalogFile, string destDirectory, CancellationToken ct = default)
{
    IFileInfo catalogFileInfo = _fs.FileInfo.FromFileName(catalogFile.FilePath);
    string catalogFileName = _fs.Path.GetFileNameWithoutExtension(catalogFileInfo.Name);
    string datFilePath = _fs.Path.Combine(catalogFileInfo.DirectoryName, $"{catalogFileName}.dat");
    IFileInfo datFileInfo = _fs.FileInfo.FromFileName(datFilePath);

    await using Stream stream = datFileInfo.OpenRead();
    
    foreach (CatalogEntry catalogEntry in catalogFile.CatalogEntries)
    {
        string destFilePath = _fs.Path.Combine(destDirectory, catalogEntry.AssetPath);
        IFileInfo destFile = _fs.FileInfo.FromFileName(destFilePath);
        if (!destFile.Directory.Exists)
        {
            destFile.Directory.Create();
        }
        stream.Seek(catalogEntry.ByteOffset, SeekOrigin.Begin);
        var newFileData = new byte[catalogEntry.AssetSize];
        int read = await stream.ReadAsync(newFileData, 0, catalogEntry.AssetSize, ct);
        if (read != catalogEntry.AssetSize)
        {
            _logger?.LogError("Could not read asset data from dat file: {DatFile}", datFilePath);
            throw new DatFileReadException("Could not read asset data from dat file", datFilePath);
        }
        await using Stream destStream = _fs.File.Open(destFile.FullName, FileMode.Create);
        destStream.Write(newFileData);
        destStream.Close();
    }
}

可以猜到,这种方法既慢又会在堆上分配大量内存,从而使垃圾回收器忙碌。

我对上述方法进行了一些修改,尝试使用缓冲区进行读取,然后使用 stackalloc 和 Span 代替使用 new byte[catalogEntry.AssetSize] 进行分配。虽然在缓冲读取方面没有取得太多的进展,但当文件大小超过堆栈大小时,使用 stackalloc 很自然会导致 StackOverflow 异常。

然后经过一些研究,我决定可以使用 .NET Core 2.1 中引入的 System.IO.Pipelines。并将上述方法更改如下。

public async Task ExportAssetsPipe(CatalogFile catalogFile, string destDirectory, CancellationToken ct = default)
{
    IFileInfo catalogFileInfo = _fs.FileInfo.FromFileName(catalogFile.FilePath);
    string catalogFileName = _fs.Path.GetFileNameWithoutExtension(catalogFileInfo.Name);
    string datFilePath = _fs.Path.Combine(catalogFileInfo.DirectoryName, $"{catalogFileName}.dat");

    IFileInfo datFileInfo = _fs.FileInfo.FromFileName(datFilePath);
    
    await using Stream stream = datFileInfo.OpenRead();

    foreach (CatalogEntry catalogEntry in catalogFile.CatalogEntries)
    {
        string destFilePath = _fs.Path.Combine(destDirectory, catalogEntry.AssetPath);
        IFileInfo destFile = _fs.FileInfo.FromFileName(destFilePath);
        if (!destFile.Directory.Exists)
        {
            destFile.Directory.Create();
        }
        stream.Position = catalogEntry.ByteOffset;
        var reader = PipeReader.Create(stream);
        while (true)
        {
            ReadResult readResult = await reader.ReadAsync(ct);
            ReadOnlySequence<byte> buffer = readResult.Buffer;
            if (buffer.Length >= catalogEntry.AssetSize)
            {
                ReadOnlySequence<byte> entry = buffer.Slice(0, catalogEntry.AssetSize);
                await using Stream destStream = File.Open(destFile.FullName, FileMode.Create);
                foreach (ReadOnlyMemory<byte> mem in entry)
                {
                   await destStream.WriteAsync(mem, ct);
                }
                destStream.Close();
                break;
            }
            reader.AdvanceTo(buffer.Start, buffer.End);
        }
    }
}

根据BenchmarkDotnet的数据显示,第二种方法的性能和内存分配比第一种方法更差。这可能是因为我在使用System.IO.Pipelines时使用方式不正确或不当导致的。由于我以前没有处理如此大型文件的输入/输出操作经验,所以对此并不熟悉。请问如何在最小的内存分配和最大的性能下实现我的目标?非常感谢您提前的帮助和正确的指导。

2
使用新的ArrayPool<T>,它在System.Buffers中,先研究如何使用它以避免内存泄漏。您需要始终从池中租用并归还,这将有助于内存分配。 - Mauricio Atanache
3
尝试此链接 https://adamsitnik.com/Array-Pool/ - Mauricio Atanache
1
Stream.CopyTo应该更简单,只需要一个缓冲区分配...(假设您只是想将大文件拆分成小文件)每次读取16-64k块可能是您需要的 - 如果要自己滚动,则有很多问题讨论IO优化(像现在一样在LOH上分配内存不是优化的方法 :)) - Alexei Levenkov
1
很高兴听到这个消息,我应该把它作为一个答案吗? - Mauricio Atanache
我想先尝试@AlexeiLevenkov的答案(使用子流)。 - Deniz İrgin
显示剩余7条评论
2个回答

7
首先,我感谢 Mauricio Atanache 和 Alexei Levenkov 提供的建议。在尝试他们两人建议的方法时,我学到了不少东西。在进行了基准测试后,我决定采用 Alexei Levenkov 建议的 SubStream 和 Stream.CopyTo 方法。
首先,我想分享解决方案。然后,那些好奇的人可以检查基准测试和结果。
解决方案
Alexei 指引我去看了一个旧问题,我审查了那里的解决方案,并将其调整为我的代码。 如何向用户公开流的子部分 首先,我需要一个 SubStream 实现,基本上我想做的是从一个大的 .dat 文件中提取小文件。通过使用 SubStream,我可以将文件从 FileStream 的所需偏移量处封装起来。然后,使用 Stream.Copy 方法,我可以将 SubStream 中的内容复制到另一个 FileStream 中并写入文件系统。使用这种方法,我只进行一次缓冲区分配。
public class SubStream : Stream
{
    private readonly Stream _baseStream;
    private readonly long _length;
    private long _position;

    public SubStream(Stream baseStream, long offset, long length)
    {
        if (baseStream == null)
        {
            throw new ArgumentNullException(nameof(baseStream), "Base stream cannot be null");
        }

        if (!baseStream.CanRead)
        {
            throw new ArgumentException("Base stream must be readable.", nameof(baseStream));
        }

        if (offset < 0)
        {
            throw new ArgumentOutOfRangeException(nameof(offset));
        }

        _baseStream = baseStream;
        _length = length;

        if (baseStream.CanSeek)
        {
            baseStream.Seek(offset, SeekOrigin.Current);
        }
        else
        {
            // read it manually...
            const int bufferSize = 512;
            var buffer = new byte[bufferSize];
            while (offset > 0)
            {
                int read = baseStream.Read(buffer, 0, offset < bufferSize ? (int)offset : bufferSize);
                offset -= read;
            }
        }
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        CheckDisposed();
        long remaining = _length - _position;
        if (remaining <= 0)
        {
            return 0;
        }

        if (remaining < count)
        {
            count = (int)remaining;
        }
        
        int read = _baseStream.Read(buffer, offset, count);
        _position += read;
        
        return read;
    }

    private void CheckDisposed()
    {
        if (_baseStream == null)
        {
            throw new ObjectDisposedException(GetType().Name);
        }
    }

    public override long Length
    {
        get
        {
            CheckDisposed();
            return _length;
        }
    }

    public override bool CanRead
    {
        get
        {
            CheckDisposed();
            return true;
        }
    }

    public override bool CanWrite
    {
        get
        {
            CheckDisposed();
            return false;
        }
    }

    public override bool CanSeek
    {
        get
        {
            CheckDisposed();
            return false;
        }
    }

    public override long Position
    {
        get
        {
            CheckDisposed();
            return _position;
        }
        set => throw new NotSupportedException();
    }

    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();

    public override void SetLength(long value) => throw new NotSupportedException();

    public override void Write(byte[] buffer, int offset, int count) => throw new NotImplementedException();

    public override void Flush()
    {
        CheckDisposed();
        _baseStream.Flush();
    }
}

方法的最终版本如下。
private static void ExportAssets(CatalogFile catalogFile, string destDirectory)
{
    FileInfo catalogFileInfo = new FileInfo(catalogFile.FilePath);
    string catalogFileName = Path.GetFileNameWithoutExtension(catalogFileInfo.Name);
    string datFilePath = Path.Combine(catalogFileInfo.DirectoryName, $"{catalogFileName}.dat");
    FileInfo datFileInfo = new FileInfo(datFilePath);

    using Stream stream = datFileInfo.OpenRead();
    foreach (CatalogEntry catalogEntry in catalogFile.CatalogEntries)
    {
        string destFilePath = Path.Combine(destDirectory, catalogEntry.AssetPath);
        FileInfo destFile = new FileInfo(destFilePath);

        if (!destFile.Directory.Exists)
        {
            destFile.Directory.Create();
        }

        using var subStream = new SubStream(stream, catalogEntry.ByteOffset, catalogEntry.AssetSize);
        using Stream destStream = File.Open(destFile.FullName, FileMode.Create);
        subStream.CopyTo(destStream);
        destStream.Close();
    }
}

基准测试设置

我在进行基准测试时使用的设置

  • 我使用了两个不同的.dat文件,一个大小为600KB,另一个大小为550MB。
  • 在基准测试中,对文件系统进行写操作会导致结果波动。因此,我使用MemoryStream来模拟写操作。
  • 基准测试中包括了方法的同步和异步版本。
  • 我使用System.IO.Abstractions库来模拟文件IO操作以进行单元测试。请不要被以Fs.开头的方法调用所迷惑(例如Fs.FileInfo.FromFileName(catalogFile.FilePath))。

基准测试使用了三个不同版本的方法。

第一个版本是未优化的版本,它为.dat文件中的每个子文件分配new byte[]

private static void ExportAssetsUnoptimized(CatalogFile catalogFile, string destDirectory)
{
    IFileInfo catalogFileInfo = Fs.FileInfo.FromFileName(catalogFile.FilePath);
    string catalogFileName = Fs.Path.GetFileNameWithoutExtension(catalogFileInfo.Name);
    string datFilePath = Fs.Path.Combine(catalogFileInfo.DirectoryName, $"{catalogFileName}.dat");
    IFileInfo datFileInfo = Fs.FileInfo.FromFileName(datFilePath);

    using Stream stream = datFileInfo.OpenRead();

    foreach (CatalogEntry catalogEntry in catalogFile.CatalogEntries)
    {
        string destFilePath = Fs.Path.Combine(destDirectory, catalogEntry.AssetPath);
        IFileInfo destFile = Fs.FileInfo.FromFileName(destFilePath);

        if (!destFile.Directory.Exists)
        {
            // destFile.Directory.Create();
        }

        stream.Seek(catalogEntry.ByteOffset, SeekOrigin.Begin);
        var newFileData = new byte[catalogEntry.AssetSize];
        int read = stream.Read(newFileData, 0, catalogEntry.AssetSize);

        if (read != catalogEntry.AssetSize)
        {
            throw new DatFileReadException("Could not read asset data from dat file", datFilePath);
        }

        // using Stream destStream = Fs.File.Open(destFile.FullName, FileMode.Create);
        using var destStream = new MemoryStream();
        destStream.Write(newFileData);
        destStream.Close();
    }
}

第二个是System.Buffer中的ArrayPool(由Mauricio Atanache建议)。ArrayPool<T>是托管数组的高性能池。您可以在System.Buffers包中找到它,并且其源代码可在GitHub上获得。它已经成熟并准备好在生产中使用。
有一篇很好的文章详细解释了这个主题。 使用ArrayPool池化大型数组 我仍然怀疑我是否正确地使用它或者是否用于其预期目的。但是当我像下面这样使用它时,我观察到它比上面未优化的版本更快,并且节省了一半的分配。
private static void ExportAssetsWithArrayPool(CatalogFile catalogFile, string destDirectory)
{
    IFileInfo catalogFileInfo = Fs.FileInfo.FromFileName(catalogFile.FilePath);
    string catalogFileName = Fs.Path.GetFileNameWithoutExtension(catalogFileInfo.Name);
    string datFilePath = Fs.Path.Combine(catalogFileInfo.DirectoryName, $"{catalogFileName}.dat");
    IFileInfo datFileInfo = Fs.FileInfo.FromFileName(datFilePath);

    ArrayPool<byte> bufferPool = ArrayPool<byte>.Shared;

    using Stream stream = datFileInfo.OpenRead();
    foreach (CatalogEntry catalogEntry in catalogFile.CatalogEntries)
    {
        string destFilePath = Fs.Path.Combine(destDirectory, catalogEntry.AssetPath);
        IFileInfo destFile = Fs.FileInfo.FromFileName(destFilePath);

        if (!destFile.Directory.Exists)
        {
            //destFile.Directory.Create();
        }

        stream.Seek(catalogEntry.ByteOffset, SeekOrigin.Begin);
        byte[] newFileData = bufferPool.Rent(catalogEntry.AssetSize);
        int read = stream.Read(newFileData, 0, catalogEntry.AssetSize);

        if (read != catalogEntry.AssetSize)
        {
            throw new DatFileReadException("Could not read asset data from dat file", datFilePath);
        }

        // using Stream destStream = Fs.File.Open(destFile.FullName, FileMode.Create);
        using Stream destStream = new MemoryStream();
        destStream.Write(newFileData, 0, catalogEntry.AssetSize);
        destStream.Close();
        bufferPool.Return(newFileData);
    }
}

第三个版本是最快且内存分配最少的版本。我所说的最小内存分配是指分配的内存少了约75倍,速度显著更快。

我已经在答案开头给出了此方法的代码示例并进行了解释。因此,我将跳过基准测试结果。

您可以从下面的gist链接中访问完整的Benchmarkdotnet设置。

https://gist.github.com/Blind-Striker/8f7e8ff56de6d9c2a4ab7a47ae423eba

基准测试结果

结论和免责声明

我得出的结论是,SubStream 和 Stream.CopyTo 方法分配的内存要少得多,运行速度也要快得多。可能部分分配是由于 Path.Combine

但是,我想提醒大家,直到我在 Stackoverflow 上发布这个问题之前,我从未使用过 ArrayPool。有可能我没有正确地使用它或者没有按照预期的目的使用它。我也不确定在将 MemoryStream 用作写入目标来保持基准测试一致时使用它的准确性是否高。

方法 文件大小 平均值 误差 标准偏差 Gen 0 Gen 1 Gen 2 分配的内存
ExportAssetsUnoptimized_Benchmark Large_5GB 563,034.4 微秒 13,290.13 微秒 38,977.64 微秒 140000.0000 140000.0000 140000.0000 1,110,966 KB
ExportAssetsWithArrayPool_Benchmark Large_5GB 270,394.1 微秒 5,308.29 微秒 6,319.15 微秒 5500.0000 4000.0000 4000.0000 555,960 KB
ExportAssetsSubStream_Benchmark Large_5GB 17,525.8 微秒 183.55 微秒 171.69 微秒 3468.7500 3468.7500 3468.7500 14,494 KB
ExportAssetsUnoptimizedAsync_Benchmark Large_5GB 574,430.4 微秒 20,442.46 微秒 59,954.20 微秒 133000.0000 133000.0000 133000.0000 1,111,298 KB
ExportAssetsWithArrayPoolAsync_Benchmark Large_5GB 237,256.6 微秒 5,673.63 微秒 16,728.82 微秒 1500.0000 - - 556,088 KB
ExportAssetsSubStreamAsync_Benchmark Large_5GB 32,766.5 微秒 636.08 微秒 732.51 微秒 3187.5000 2562.5000 2562.5000 15,186 KB

2

使用新的ArrayPool,它在System.Buffers中。首先研究如何使用它以避免内存泄漏。

您需要始终从池中租用并归还,这将在内存分配方面有很大帮助。

尝试访问此链接adamsitnik.com/Array-Pool进行研究。


1
加油吧;你可以比那个更努力一些。"看看ArrayPool;这里有一个可能会消失的博客链接"作为评论是可以的,但作为答案质量较低;这个答案应该包含从博客中提取但与OP的情况相关的代码片段。 - Caius Jard

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接