读写二进制文件的最快方法

15

我目前正在优化一个应用程序,其中一个经常执行的操作是读写二进制。 我需要两种类型的函数:

Set(byte[] target, int index, int value);

int Get(byte[] source, int index);

这些函数需要处理大端和小端的有符号和无符号short、int和long类型。

这里有一些我写的示例,但我需要评估它们的优缺点:

第一种方法是使用Marshal将值写入byte[]内存中,第二种方法是使用普通指针完成,第三种方法使用BitConverter和BlockCopy来实现。

unsafe void Set(byte[] target, int index, int value)
{
    fixed (byte* p = &target[0])
    {
        Marshal.WriteInt32(new IntPtr(p), index, value);
    }
}

unsafe void Set(byte[] target, int index, int value)
{
    int* p = &value;
    for (int i = 0; i < 4; i++)
    {
        target[offset + i] = *((byte*)p + i);
    }
}

void Set(byte[] target, int index, int value)
{
    byte[] data = BitConverter.GetBytes(value);
    Buffer.BlockCopy(data, 0, target, index, data.Length);
}

以下是读取/获取方法:

第一个方法使用Marshal从byte[]中读取值,第二个方法使用普通指针,第三个方法再次使用BitConverter:

unsafe int Get(byte[] source, int index)
{
    fixed (byte* p = &source[0])
    {
        return Marshal.ReadInt32(new IntPtr(p), index);
    }
}

unsafe int Get(byte[] source, int index)
{
    fixed (byte* p = &source[0])
    {
        return *(int*)(p + index);
    }
}

unsafe int Get(byte[] source, int index)
{
    return BitConverter.ToInt32(source, index);
}

目前我的问题中还未包括边界检查,但需要进行检查...

如果有人能告诉我在这种情况下最好和最快的方法是什么,或者给我一些其他解决方案来处理这个问题,那就太好了。通用解决方案更可取。


我刚做了一些性能测试,以下是结果:

设置 Marshal:45 毫秒,设置指针:48 毫秒,设置 BitConverter:71 毫秒 获取 Marshal:45 毫秒,获取指针:26 毫秒,获取 BitConverter:30 毫秒

看起来使用指针是最快的方式,但我认为 Marshal 和 BitConverter 做了一些内部检查... 有人可以验证一下吗?


1
你已经有了代码,为什么不用 Stopwatch 运行并测试一下呢? - Mehrdad Afshari
你说得对,我会快速地编辑我的问题。但这并不是我发帖的唯一目的。我正在寻找替代方案,也许还有通用的方法来解决这个问题。 - haze4real
2
对这个问题有所疑惑:转换为二进制只有在进行输入输出时才是必要的。 I/O 操作本身始终比操作比特位慢几个数量级。最佳优化也不可能为您带来超过几个百分点的改进。 - Hans Passant
4个回答

16

重要提示:如果你只需要一个字节序(大小端),请参考wj32/dtb的指针技巧。


个人而言,我会直接写入Stream(可能带有一些缓冲区),并重复使用共享的缓冲区,可以通常假设它是干净的。然后您可以快速访问索引0/1/2/3等。

当然不要使用BitConverter,因为它不能用于小端和大端,而您需要它们两者都支持。我也倾向于使用位移操作而不是不安全的处理方式等。根据以下基准测试结果表明,位移操作实际上是最快的(所以我很高兴这已经是我在代码中所采用的方式,请查看这里,查找EncodeInt32Fixed):

Set1: 371ms
Set2: 171ms
Set3: 993ms
Set4: 91ms <==== bit-shifting ;-p

代码:

using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
static class Program
{
    static void Main()
    {
        const int LOOP = 10000000, INDEX = 100, VALUE = 512;
        byte[] buffer = new byte[1024];
        Stopwatch watch;

        watch = Stopwatch.StartNew();
        for (int i = 0; i < LOOP; i++)
        {
            Set1(buffer, INDEX, VALUE);
        }
        watch.Stop();
        Console.WriteLine("Set1: " + watch.ElapsedMilliseconds + "ms");

        watch = Stopwatch.StartNew();
        for (int i = 0; i < LOOP; i++)
        {
            Set2(buffer, INDEX, VALUE);
        }
        watch.Stop();
        Console.WriteLine("Set2: " + watch.ElapsedMilliseconds + "ms");

        watch = Stopwatch.StartNew();
        for (int i = 0; i < LOOP; i++)
        {
            Set3(buffer, INDEX, VALUE);
        }
        watch.Stop();
        Console.WriteLine("Set3: " + watch.ElapsedMilliseconds + "ms");

        watch = Stopwatch.StartNew();
        for (int i = 0; i < LOOP; i++)
        {
            Set4(buffer, INDEX, VALUE);
        }
        watch.Stop();
        Console.WriteLine("Set4: " + watch.ElapsedMilliseconds + "ms");

        Console.WriteLine("done");
        Console.ReadLine();
    }
    unsafe static void Set1(byte[] target, int index, int value)
    {
        fixed (byte* p = &target[0])
        {
            Marshal.WriteInt32(new IntPtr(p), index, value);
        }
    }

    unsafe static void Set2(byte[] target, int index, int value)
    {
        int* p = &value;
        for (int i = 0; i < 4; i++)
        {
            target[index + i] = *((byte*)p + i);
        }
    }

    static void Set3(byte[] target, int index, int value)
    {
        byte[] data = BitConverter.GetBytes(value);
        Buffer.BlockCopy(data, 0, target, index, data.Length);
    }
    static void Set4(byte[] target, int index, int value)
    {
        target[index++] = (byte)value;
        target[index++] = (byte)(value >> 8);
        target[index++] = (byte)(value >> 16);
        target[index] = (byte)(value >> 24);
    }
}

我认为Stream不是一个好的解决方案,问题在于可能需要寻找并且数据不总是按顺序读取和写入。另一个问题是字节序。 - haze4real
我需要先验证一下,如果可以的话,我会接受这个答案作为解决方案。那么获取/读取呢? - haze4real
6
同样的操作,但是相反的顺序;return ((int)buffer[index++]) | (((int)buffer[index++]) << 8) | (((int)buffer[index++]) << 16) | (((int)buffer[index]) << 24); (或者对移位操作从上到下进行调整以获取其他字节序)。请注意,我们提前将其转换为 int 类型,因为 int 算术比 byte 算术更快。 - Marc Gravell
我认为目前最好的解决方案是移位,最大的优点是易于进行大小端交换。 - haze4real

14

我在我的计算机上使用Marc Gravell的Set1Set4以及下面的Set5,得到以下数字:

Set1: 197ms
Set2: 102ms
Set3: 604ms
Set4: 68ms
Set5: 55ms <==== pointer magic ;-p

代码:

unsafe static void Set5(byte[] target, int index, int value)
{
    fixed (byte* p = &target[index])
    {
        *((int*)p) = value;                
    }
}
当字节数组不是在每次迭代时都被固定,而只是固定一次时,它的速度当然会变得更快。
Set6: 10ms (little endian)
Set7: 85ms (big endian)

代码:

if (!BitConverter.IsLittleEndian)
{
    throw new NotSupportedException();
}

watch = Stopwatch.StartNew();
fixed (byte* p = buffer)
{
    for (int i = 0; i < LOOP; i++)
    {
        *((int*)(p + INDEX)) = VALUE;
    }
}
watch.Stop();
Console.WriteLine("Set6: " + watch.ElapsedMilliseconds + "ms");

watch = Stopwatch.StartNew();
fixed (byte* p = buffer)
{
    for (int i = 0; i < LOOP; i++)
    {
        *((int*)(p + INDEX)) = System.Net.IPAddress.HostToNetworkOrder(VALUE);
    }
}
watch.Stop();
Console.WriteLine("Set7: " + watch.ElapsedMilliseconds + "ms");

这里的问题是由字节序交换引起的开销。 - haze4real
很棒,我已经添加了一个更新,但是字节序仍然是一个问题。我今天已经没有投票的机会了,但是虚拟+1。 - Marc Gravell
开销非常大,根据你的测试结果大约慢了8倍。在测试中不应该将字节数组固定在循环外部,因为需要测试的是函数本身...我无法在这些函数调用之间固定它。 - haze4real
是的,但只是因为你在循环外固定了数组,这只是模拟函数调用次数,并不是解决方案的一部分,因为函数本身在代码中的不同位置被调用。 - haze4real
2
如果你想要连续多次修改同一个数组,那么你只能在循环外固定该数组。如果在你的应用程序中不是这种情况,那么你就不能这样做。但是如果可以的话,性能提升是显而易见的。 - dtb
显示剩余2条评论

3

指针是解决问题的好方法。使用fixed关键字固定对象非常便宜,可以避免调用WriteInt32和BlockCopy等函数的开销。对于“通用解决方案”,您可以简单地使用void*并使用自己的memcpy(因为您处理的数据量很小)。但是指针不能与真正的泛型一起使用。


那么你显然没有正确编写你的代码。你真的认为使用位移(每个Int32至少需要大约8条指令)比使用简单的mov指令更快吗?而且我是在谈论将缓冲区固定在循环外部。 - wj32
1
使用以下代码(fixed (byte* b = array) { for (...) (int)(b + offset) = value; })来替换您的Get方法。如果您不相信这是最快的方法,请查看Disassembly窗口。 - wj32
1
啊,我误解了。但是要求是双端字节序。所以你必须为另一个端提供备用方案,并抽象出这两个方案的机制。我会保持简单并编写移位操作。这也引发了一个问题:“我是否要检查/支持大端硬件”等。 - Marc Gravell
不应该需要 for 循环,除非我漏掉了什么?只需获取 b + offset,交换为 int* 并赋值即可。 - Marc Gravell
1
遗憾的是,C#没有提供一种利用bswap指令实现大/小端兼容性的方法,因此在这种情况下,您的解决方案将是最快的。至于for循环,它的意思是表示固定引脚将在任何循环之外完成。 - wj32
显示剩余3条评论

1

你应该对你的代码进行一些分析,以确定这是否是瓶颈所在。此外,从你的代码来看,似乎你正在使用 .Net 函数调用将一个字节写入到一个非托管数组中,涉及到内存的固定和对不安全代码的调用...

如果可能的话,你最好声明一个 .Net System.IO.MemoryStream,并在其中进行寻址和写入操作,尽量使用流写入器来推送你的更改,这样可以减少函数调用并且不需要使用不安全代码。如果你在 C# 中进行类似 DSP 的操作,比如需要对数组中的每个值执行单个操作等,你会发现指针相关的内容更加有用。

编辑: 还要提醒一下,根据你所做的工作,CPU 缓存可能会起作用,如果你能够持续处理适合缓存的小块内存,那么你将获得最佳性能。


问题在于它可能成为瓶颈,因为应用程序正在与大量不同的网络设备通信,并且正在低成本机器上运行,其中一些设备使用重型协议,而其他设备则不使用。您知道一个好的方法来分析接口吗?问题将是网络的可变延迟。 - haze4real

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接