在C#中,最快(便携)分割数组的方法是什么?

6
我正在编写一个完全托管的Mercurial库(将用于即将推出的完全托管的Windows Mercurial服务器),而我遇到的最严重的性能问题之一是,奇怪的是,将数组分成几个部分。
思路如下:有一个字节数组,大小从几百字节到一兆字节不等,我所需要做的就是将其拆分为由特定字符\n分隔的部分。
现在,dotTrace向我展示的是,我的Split"优化"版本(代码是正确的,这里是我最初的简单版本)对于2300个调用需要11秒钟(显然,dotTrace本身引入了明显的性能损失,但所有东西都按比例缩放)。
以下是数字:
  • unsafe版本:对于2 312个调用,耗时11 297毫秒
  • 托管(“naive”)版本:对于2 312个调用,耗时20 001毫秒

所以问题来了:在C#中,最快的(最好是可移植的,即支持x86和x64)分割数组的方法是什么。


3
我们可以看一下你的“优化版”Split,这样我们就可以考虑如何改进它了吗? - L.B
2
您可以在“优化版本”链接中找到他的代码。 - Ofir Baruch
4
我可能漏掉了什么,但是为什么你要以8个字节为增量进行迭代?我认为最快的方法也是最简单的方法。通过一次数组迭代(逐字节),记录分隔索引。分配一个字节数组的数组,使用分隔索引之间的差作为每个单独字节数组的长度。然后使用某些内置函数从原始数组中复制内存。显然,绝对最快的方法是不进行拆分,而是创建一个可以像字节数组数组一样进行索引的结构,但通过偏移量引用原始分配内存中的内存。 - Gleno
2
首要任务,销毁和深埋LinkedList。它几乎从来没有比简单的List更高效,对于简单的附加操作,它永远不会更快。 - Konrad Rudolph
2
有这么多的 if... 你难道不可能成为分支预测的受害者吗? - Kuba Wyrostek
显示剩余19条评论
3个回答

4

我认为问题在于你在循环中做了很多复杂的操作。这段代码除了单个加法和比较以外,删除了所有其他操作。只有在检测到拆分或数组结尾时才会发生其他复杂操作。

此外,很难确定你使用什么样的数据来运行测试,所以这只是猜测。

public static unsafe Segment[] Split2(byte[] _src, byte value)
{
    var _ln = _src.Length;

    if (_ln == 0) return new Segment[] { };

    fixed (byte* src = _src)
    {
        var segments = new LinkedList<Segment>(); // Segment[c];

        byte* last = src;
        byte* end = src + _ln - 1;
        byte lastValue = *end;
        *end = value; // value-termination

        var cur = src;
        while (true)
        {
            if (*cur == value)
            {
                int begin = (int) (last - src);
                int length = (int) (cur - last + 1);
                segments.AddLast(new Segment(_src, begin, length));

                last = cur + 1;

                if (cur == end)
                {
                    if (lastValue != value)
                    {
                        *end = lastValue;
                    }
                    break;
                }
            }
            cur++;
        }

        return segments.ToArray();
    }
}

编辑: 修复代码,以返回正确的结果。


这是一个非常巧妙的哨兵使用方式..然而,只是一个条件/字节的重复..看到时间会很好。 - user166390
不错!在 dotTrace 下,2,312 次调用只用了 4,696 毫秒。 - Anton Gogolev
@pst:这几乎比我的“不安全”版本快三倍。 - Anton Gogolev
您正在使用终止值覆盖数组的最后一个字节。我不确定这是否有效。更节省数据的方法是循环到结尾,如果最后一个元素不是终止字符,则也将其复制。 - Alois Kraus
@AloisKraus 但这样每个循环将会有两次比较而不是一次。这将导致性能下降约50%。 - Euphoric
你可以检查数组是否被终止。如果没有被终止,就使用这两个比较。否则,您可以使用最快的快速路径。 - Alois Kraus

3
对于Split函数,在32位机器上处理ulong类型会非常慢,因此应该减少使用并改为使用uint类型。如果真的需要使用ulong,则应该实现两个版本,一个用于32位,一个用于64位。
您还应该测量一下每次处理一个字节是否更快。
需要分析内存分配的成本。如果成本足够大,请尝试在多个调用之间重复使用内存。
其他:
ToString:最好使用"(" + Offset.ToString() + ", " + Length.ToString() + ")"来提高速度。
GetHashCode: 尝试使用fixed(byte * b = & buffer[offset])
如果多次使用,此版本应该非常快。关键点:在内部数组扩展到合适的大小后不再进行新的内存分配,数据拷贝最小化。
class ArraySplitter
{
    private byte[] m_data;
    private int    m_count;
    private int[]  m_stops;

    private void AddRange(int start, int stop)
    {
        // Skip empty range
        if (start > stop)
        {
            return;
        }

        // Grow array if needed
        if ((m_stops == null) || (m_stops.Length < (m_count + 2)))
        {
            int[] old = m_stops;

            m_stops = new int[m_count * 3 / 2 + 4];

            if (old != null)
            {
                old.CopyTo(m_stops, 0);
            }
        }

        m_stops[m_count++] = start;
        m_stops[m_count++] = stop;
    }

    public int Split(byte[] data, byte sep)
    {
        m_data  = data;
        m_count = 0;      // reuse m_stops

        int last = 0;

        for (int i = 0; i < data.Length; i ++)
        {
            if (data[i] == sep)
            {
                AddRange(last, i - 1);
                last = i + 1;
            }
        }

        AddRange(last, data.Length - 1);

        return m_count / 2;
    }

    public ArraySegment<byte> this[int index]
    {
        get
        {
            index *= 2;
            int start = m_stops[index];

            return new ArraySegment<byte>(m_data, start, m_stops[index + 1] - start + 1);
        }
    }
}

测试程序:

    static void Main(string[] args)
    {
        int count = 1000 * 1000;

        byte[] data = new byte[count];

        for (int i = 0; i < count; i++)
        {
            data[i] = (byte) i;
        }

        Stopwatch watch = new Stopwatch();

        for (int r = 0; r < 10; r++)
        {
            watch.Reset();
            watch.Start();

            int len = 0;

            foreach (var seg in data.MySplit(13))
            {
                len += seg.Count;
            }

            watch.Stop();

            Console.WriteLine("MySplit      : {0} {1,8:N3} ms", len, watch.Elapsed.TotalMilliseconds);

            watch.Reset();
            watch.Start();

            ArraySplitter splitter = new ArraySplitter();

            int parts = splitter.Split(data, 13);

            len = 0;

            for (int i = 0; i < parts; i++)
            {
                len += splitter[i].Count;
            }

            watch.Stop();
            Console.WriteLine("ArraySplitter: {0} {1,8:N3} ms", len, watch.Elapsed.TotalMilliseconds);
        }
    }

结果:

MySplit      : 996093    9.514 ms
ArraySplitter: 996093    4.754 ms
MySplit      : 996093    7.760 ms
ArraySplitter: 996093    2.710 ms
MySplit      : 996093    8.391 ms
ArraySplitter: 996093    3.510 ms
MySplit      : 996093    9.677 ms
ArraySplitter: 996093    3.468 ms
MySplit      : 996093    9.685 ms
ArraySplitter: 996093    3.370 ms
MySplit      : 996093    9.700 ms
ArraySplitter: 996093    3.425 ms
MySplit      : 996093    9.669 ms
ArraySplitter: 996093    3.519 ms
MySplit      : 996093    9.844 ms
ArraySplitter: 996093    3.416 ms
MySplit      : 996093    9.721 ms
ArraySplitter: 996093    3.685 ms
MySplit      : 996093    9.703 ms
ArraySplitter: 996093    3.470 ms

你真的认为自己的可调整大小缓冲区比框架提供的(S.C.G.List)更有效吗?- Konrad Rudolph昨天说。基本上是相同的事情,只是消除了一级间接性。此外,我认为List<T>的增长率100%太多了,所以我将其更改为50%的增长率。- Feng Yuan昨天说。内置集合在CLR中得到某种特殊处理(手动重新实现它们1:1会导致更慢的代码!)。我认为运行时会排除间接性。- Konrad Rudolph 17小时前 - Feng Yuan
上周我在程序集级别检查了数组和List<T>的访问。相信我,数组访问比List<T>访问快得多。 - Feng Yuan
此外,如果您为List<T>设置了一个足够估计的初始大小来避免过多的初始 X2 重设大小,则可以大大提高其性能。我改变了我之前发布的天真方法,不使用yield return并且有几千的初始大小(实际上我使用了每100个字节1个新行的估计),两者的性能都差不多 (仍然没有不安全实现快)。 - Pablo Romeo

2

安东,

我不知道你是否仍然对此进行优化,因为这个帖子已经相当老了,但是我看到你的代码在你的在线存储库中几乎是一样的,所以我想尝试一下。我在评估你的HgLab应用程序时查看了bitbucket.org上的HgSharp代码。我使用本机构建重写了函数,这大大简化了它。我的测试结果比你原来的例程快了一半以上。我通过加载一个几兆字节的源文件并将其与使用你原来的例程执行相同操作的性能进行比较来进行测试。

除了重写基本逻辑之外,我决定使用内置于框架中的本机ArraySegment<>而不是你的自定义实现。唯一的显着区别是ArraySegment<>公开了一个Count属性而不是Length属性。下面的代码不需要unsafe关键字,因为我没有使用指针,但是如果更改为这样做,则似乎有轻微的性能提高。


    public static ArraySegment<byte>[] SplitEx(this byte[] source, byte value) {
        var _previousIndex = -1;
        var _segments = new List<ArraySegment<byte>>();
        var _length = source.Length;

        if (_length > 0) {
            int _index;

            for (_index = 0; _index < _length; _index++) {
                var _value = source[_index];
                if (_value == value) {
                    _segments.Add(new ArraySegment<byte>(source, _previousIndex + 1, _index - _previousIndex));
                    _previousIndex = _index;
                }
            }

            if (--_index != _previousIndex) {
                _segments.Add(new ArraySegment<byte>(source, _previousIndex + 1, _index - _previousIndex));
            }
        }

        return _segments.ToArray();
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接