在.NET中处理大文件

4

问题

我需要使用C#保存和读取一个非常大的数据结构。这个结构本身相当简单;它是一个非常长的数组,由一些固定大小的简单结构体组成。

以下是一个例子,以便更好地理解:

struct st {
UInt32 a;
UInt16 b;
//etc.
} completion ports

st[] data = new st[1024*1024*100]

我希望能够尽可能快速和高效地将它们保存和加载到文件中。我的想法是将数据切成片段,将这些片段分配给任务,并异步地将它们写入文件。FileStream.WriteAsync似乎非常适合这个任务。但是,我在读取方面遇到了问题。从FileStream.ReadAsync API中可以看出,结果完全可以在每个结构的中间被截断,甚至在一个原始数据类型的一半处。当然,我可以解决这个问题,但我不确定什么方法最好,以及我会干扰操作系统的缓冲机制多少。最终,我计划使用MemoryStream.MemoryStream(byte[])从每个缓冲区创建一个MemoryStream,并使用二进制读取器将每个缓冲区读入结构体中。
那么,有什么更好的解决方法吗?我的方向正确吗?是否有更好的解决方案?如果有代码示例和链接,将不胜感激...
经过性能测试后,我发现使用BinaryReader读取文件或使用多个读取器进行FileStream.ReadAsync读取,大致具有相同的性能。因此,这个问题是没有意义的。

问题解释得很清楚。也许你可以从这里获得一些想法:https://dev59.com/G3I95IYBdhLWcg3wxA8- - Javiere
3个回答

3
您最大的瓶颈将是IO,必须以独占式访问文件来执行。实际的字节处理速度很快 - 直接将数据写入文件会比在内存中序列化不同部分,然后将每个部分单独复制到流中要好得多(注意FileStream本身有一个缓冲区,或者您可以添加一个额外的层级使用BufferedStream)。
我的建议:只在单线程中编写数据。坦率地说,我甚至不确定是否需要使用异步代码(提示:异步代码会增加开销),尤其是如果缓冲区跟上了的话。我也不会使用BiaryWriter/BinaryReader - 我只会直接写原始数据。您可以使用一些unsafe代码来按块复制数据,以避免查看单个对象,但这是较困难的... 我会尝试做一个例子。
这里是一个读/写的示例,首先注意性能:
Write: 2012ms
Read: 1089ms
File: 838,860,804 bytes

代码:

[DllImport("msvcrt.dll", EntryPoint = "memcpy", CallingConvention = CallingConvention.Cdecl, SetLastError = false)]
public static extern IntPtr memcpy(IntPtr dest, IntPtr src, UIntPtr count);

unsafe static st[] Read(string path)
{
    using (var file = File.OpenRead(path))
    {
        int size = sizeof(st);
        const int BLOCK_SIZE = 512; // process at a time
        byte[] buffer = new byte[BLOCK_SIZE * size];

        UIntPtr bufferLen = new UIntPtr((uint)buffer.Length);
        fixed (byte* bufferPtr = buffer)
        {
            Fill(file, buffer, 0, 4);
            int len = ((int*)bufferPtr)[0];

            st[] result = new st[len];
            fixed (st* dataPtr = result)
            {
                st* rawPtr = dataPtr;
                IntPtr source= new IntPtr(bufferPtr);
                while (len >= BLOCK_SIZE)
                {
                    Fill(file, buffer, 0, buffer.Length);
                    memcpy(new IntPtr(rawPtr), source, bufferLen);
                    len -= BLOCK_SIZE;
                    rawPtr += BLOCK_SIZE;
                }
                if (len > 0)
                {
                    Fill(file, buffer, 0, len * size);
                    memcpy(new IntPtr(rawPtr), source, new UIntPtr((uint)(len * size)));
                }
            }
            return result;
        }
    }


}
static void Fill(Stream source, byte[] buffer, int offset, int count)
{
    int read;
    while (count > 0 && (read = source.Read(buffer, offset, count)) > 0)
    {
        offset += read;
        count -= read;
    }
    if (count > 0) throw new EndOfStreamException();
}

unsafe static void Write(st[] data, string path)
{
    using (var file = File.Create(path))
    {
        int size = sizeof(st);
        const int BLOCK_SIZE = 512; // process at a time
        byte[] buffer = new byte[BLOCK_SIZE * size];

        int len = data.Length;
        UIntPtr bufferLen = new UIntPtr((uint)buffer.Length);
        fixed (st* dataPtr = data)
        fixed (byte* bufferPtr = buffer)
        {
            // write the number of elements
            ((int*)bufferPtr)[0] = data.Length;
            file.Write(buffer, 0, 4);

            st* rawPtr = dataPtr;
            IntPtr destination = new IntPtr(bufferPtr);
            // write complete blocks of BLOCK_SIZE
            while (len >= BLOCK_SIZE)
            {
                memcpy(destination, new IntPtr(rawPtr), bufferLen);
                len -= BLOCK_SIZE;
                rawPtr += BLOCK_SIZE;
                file.Write(buffer, 0, buffer.Length);
            }
            if (len > 0)
            {   // write an incomplete block, if necessary
                memcpy(destination, new IntPtr(rawPtr), new UIntPtr((uint)(len * size)));
                file.Write(buffer, 0, len * size);
            }
        }
    }
}

我认为单线程写入也可以,我更关心的是读取...为什么需要锁定任何东西?FileStream.SomethingAsync不是使用完成端口吗? - AK_
@AK_ 一个文件只能存在于一个位置;你需要知道你正在读写文件的连续部分;顺便说一下,我已经添加了完整的读/写实现。 - Marc Gravell
@MarcGravell 实际上,我不需要知道我正在读取(或写入)文件的连续部分。我只需要知道我已经读取的缓冲区位置以及我的数据中相应的位置... - AK_
@AK_ 再次强调,你忽略了我的关键点:无论你如何做,主要的开销都是IO。在上面的例子中,我只是在进行IO和memcpy操作 - 后者非常快。将其切片/分解为并发任务不会带来任何收益,因为根本没有工作需要完成。通过添加并发,您所能实现的仅仅是增加:寻道时间;需要多个缓冲区;需要独占访问(是的,您需要这个)同时读取/写入...所有这些都是额外的开销。 - Marc Gravell
我只是想了解你对这个东西的经验……我的意思并不是不尊重 :-) - AK_
显示剩余8条评论

3

[编辑]我更新了这篇文章,包括一个完整可编译的示例,并回答了@Daniel在下面评论中提出的问题。因此,此代码不再使用任何“危险”方法,并且没有代码分析警告。 [/编辑]

如果您的结构体仅包含可平置类型,则可以通过使用封送来直接将数据读入数组,而无需进行其他副本操作,如下所示(完整可编译示例):

using System;
using System.ComponentModel;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using Microsoft.Win32.SafeHandles;

namespace ConsoleApplication1
{
    internal class Program
    {
        struct TestStruct // Mutable for brevity; real structs should be immutable.
        {
            public byte   ByteValue;
            public short  ShortValue;
            public int    IntValue;
            public long   LongValue;
            public float  FloatValue;
            public double DoubleValue;
        }

        static void Main()
        {
            var array = new TestStruct[10];

            for (byte i = 0; i < array.Length; ++i)
            {
                array[i].ByteValue   = i;
                array[i].ShortValue  = i;
                array[i].IntValue    = i;
                array[i].LongValue   = i;
                array[i].FloatValue  = i;
                array[i].DoubleValue = i;
            }

            Directory.CreateDirectory("C:\\TEST");

            using (var output = new FileStream(@"C:\TEST\TEST.BIN", FileMode.Create))
                FastWrite(output, array, 0, array.Length);

            using (var input = new FileStream(@"C:\TEST\TEST.BIN", FileMode.Open))
                array = FastRead<TestStruct>(input, array.Length);

            for (byte i = 0; i < array.Length; ++i)
            {
                Trace.Assert(array[i].ByteValue   == i);
                Trace.Assert(array[i].ShortValue  == i);
                Trace.Assert(array[i].IntValue    == i);
                Trace.Assert(array[i].LongValue   == i);
                Trace.Assert(array[i].FloatValue  == i);
                Trace.Assert(array[i].DoubleValue == i);
            }
        }

        /// <summary>
        /// Writes a part of an array to a file stream as quickly as possible,
        /// without making any additional copies of the data.
        /// </summary>
        /// <typeparam name="T">The type of the array elements.</typeparam>
        /// <param name="fs">The file stream to which to write.</param>
        /// <param name="array">The array containing the data to write.</param>
        /// <param name="offset">The offset of the start of the data in the array to write.</param>
        /// <param name="count">The number of array elements to write.</param>
        /// <exception cref="IOException">Thrown on error. See inner exception for <see cref="Win32Exception"/></exception>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2004:RemoveCallsToGCKeepAlive")]

        public static void FastWrite<T>(FileStream fs, T[] array, int offset, int count) where T: struct
        {
            int sizeOfT = Marshal.SizeOf(typeof(T));
            GCHandle gcHandle = GCHandle.Alloc(array, GCHandleType.Pinned);

            try
            {
                uint bytesWritten;
                uint bytesToWrite = (uint)(count * sizeOfT);

                if
                (
                    !WriteFile
                    (
                        fs.SafeFileHandle,
                        new IntPtr(gcHandle.AddrOfPinnedObject().ToInt64() + (offset*sizeOfT)),
                        bytesToWrite,
                        out bytesWritten,
                        IntPtr.Zero
                    )
                )
                {
                    throw new IOException("Unable to write file.", new Win32Exception(Marshal.GetLastWin32Error()));
                }

                Debug.Assert(bytesWritten == bytesToWrite);
            }

            finally
            {
                gcHandle.Free();
            }
        }

        /// <summary>
        /// Reads array data from a file stream as quickly as possible,
        /// without making any additional copies of the data.
        /// </summary>
        /// <typeparam name="T">The type of the array elements.</typeparam>
        /// <param name="fs">The file stream from which to read.</param>
        /// <param name="count">The number of elements to read.</param>
        /// <returns>
        /// The array of elements that was read. This may be less than the number that was
        /// requested if the end of the file was reached. It may even be empty.
        /// NOTE: There may still be data left in the file, even if not all the requested
        /// elements were returned - this happens if the number of bytes remaining in the
        /// file is less than the size of the array elements.
        /// </returns>
        /// <exception cref="IOException">Thrown on error. See inner exception for <see cref="Win32Exception"/></exception>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Reliability", "CA2004:RemoveCallsToGCKeepAlive")]

        public static T[] FastRead<T>(FileStream fs, int count) where T: struct
        {
            int sizeOfT = Marshal.SizeOf(typeof(T));

            long bytesRemaining  = fs.Length - fs.Position;
            long wantedBytes     = count * sizeOfT;
            long bytesAvailable  = Math.Min(bytesRemaining, wantedBytes);
            long availableValues = bytesAvailable / sizeOfT;
            long bytesToRead     = (availableValues * sizeOfT);

            if ((bytesRemaining < wantedBytes) && ((bytesRemaining - bytesToRead) > 0))
            {
                Debug.WriteLine("Requested data exceeds available data and partial data remains in the file.", "Dmr.Common.IO.Arrays.FastRead(fs,count)");
            }

            T[] result = new T[availableValues];

            if (availableValues == 0)
                return result;

            GCHandle gcHandle = GCHandle.Alloc(result, GCHandleType.Pinned);

            try
            {
                uint bytesRead;

                if
                (
                    !ReadFile
                    (
                        fs.SafeFileHandle,
                        gcHandle.AddrOfPinnedObject(),
                        (uint)bytesToRead,
                        out bytesRead,
                        IntPtr.Zero
                    )
                )
                {
                    throw new IOException("Unable to read file.", new Win32Exception(Marshal.GetLastWin32Error()));
                }

                Debug.Assert(bytesRead == bytesToRead);
            }

            finally
            {
                gcHandle.Free();
            }

            return result;
        }

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Interoperability", "CA1415:DeclarePInvokesCorrectly")]
        [DllImport("kernel32.dll", SetLastError=true)]
        [return: MarshalAs(UnmanagedType.Bool)]

        private static extern bool WriteFile
        (
            SafeFileHandle       hFile,
            IntPtr               lpBuffer,
            uint                 nNumberOfBytesToWrite,
            out uint             lpNumberOfBytesWritten,
            IntPtr               lpOverlapped
        );

        /// <summary>See the Windows API documentation for details.</summary>

        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Interoperability", "CA1415:DeclarePInvokesCorrectly")]
        [DllImport("kernel32.dll", SetLastError=true)]
        [return: MarshalAs(UnmanagedType.Bool)]

        private static extern bool ReadFile
        (
            SafeFileHandle       hFile,
            IntPtr               lpBuffer,
            uint                 nNumberOfBytesToRead,
            out uint             lpNumberOfBytesRead,
            IntPtr               lpOverlapped
        );
    }
}

您可以创建一个BlockingCollection来存储输入的数据,然后使用一个线程将其填充,再使用另一个线程来消费它。

将数据读入队列的线程可能如下所示:

public void ReadIntoQueue<T>(FileStream fs, BlockingCollection<T[]> queue, int blockSize) where T: struct
{
    while (true)
    {
        var data = FastRead<T>(fs, blockSize);

        if (data.Length == 0)
        {
            queue.CompleteAdding();
            break;
        }

        queue.Add(data);
    }
}

消费线程会像这样从队列中删除内容:

public void ProcessDataFromQueue<T>(BlockingCollection<T[]> queue) where T : struct
{
    foreach (var array in queue.GetConsumingEnumerable())
    {
        // Do something with 'array'
    }
}

直接使用不安全的代码来填充或读取数据会更快更简单,不是吗? - AK_
如果允许使用不安全的代码,那么可能是这样的,但该方法避免了不安全的代码在项目中传播。一旦一个程序集使用它,您必须在所有直接或间接使用它的项目上启用不安全的代码-对于某些网站(例如我们的网站)来说,这是不可接受的。这也可能没有你想象的那么容易! - Matthew Watson
1
@MatthewWatson 对于最后一点表示同意;我尝试了三次才成功地完成了我的示例(如下),而我对序列化和使用原始内存都非常熟悉... - Marc Gravell
1
@AK_ 这并不比意外指定超出数组索引或尝试读取文件流末尾更危险。使用我发布的代码无法破坏内存或任何东西。转换器会抛出异常。 - Matthew Watson
2
@AK_像往常一样,这是一个权衡。你的问题陈述为“尽可能快和高效”,我会回答说对于这个(问题),使用unsafe/memcpy方法可能(需要进行测量)是你能做到的最好的事情。然而,如果你真的非常反感使用 unsafe,那么你就不能这样做,在这种情况下,使用marshaller是一个很好的折中方案。无论哪种情况,老实说,我认为使用并发任务毫无裨益,我在这里作为一个做大量IO、大量序列化和大量并发的人讲话。 - Marc Gravell
显示剩余8条评论

1
据我所知,读写文件最快的方式是单向前进的过程。否则,磁盘必须在强制读/写的同时来回移动文件。当然,这并不意味着您不能在多个并发线程中处理数据。如果分段足够大,则磁盘移动的开销可能不会被注意到。

“做得对”,每个段的处理只是一个原始的memcpy...没什么可做的,所以不需要并发执行。 - Marc Gravell

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接