在C#中缓冲字节数据

20

我的应用程序从TCP套接字中读取字节并需要对其进行缓冲,以便稍后可以从中提取消息。由于TCP的特性,我可能会在一次读取中获得部分或多个消息,因此每次读取后,我想检查缓冲区并提取尽可能多的完整消息。

因此,我希望有一个类能够让我执行以下操作:

  • 将任意byte[]数据附加到其中
  • 检查内容而不消耗它,特别是检查内容的数量以及搜索某个特定的byte或bytes是否存在
  • 提取和消耗一部分数据作为byte[],同时将剩余的数据留在那里以供未来读取

我期望我想要的可以使用.NET库中的1个或多个现有类来实现,但我不确定哪些类。 System.IO.MemoryStream看起来接近我想要的,但是(a)不清楚它是否适合用作缓冲区(读取的数据是否从容量中删除?)和(b)读取和写入似乎发生在同一个位置-"流的当前位置是下一次读取或写入操作可以发生的位置。"-这不是我想要的。我需要从前面写入并从前面读取。


你想要追加任意字节数组吗?或者是任意字节?无论如何,也许List<byte[]>或List<byte>适合你。 - Jason
将字节数组合并为连续的字节数组。我认为List<byte>在这种应用程序中不够高效。 - Kylotan
我不知道有性能要求,Jon的建议看起来不错。 - Jason
嗯,List可能实际上还不错,经过检查实现后,但接口有点繁琐。 - Kylotan
缓冲流怎么样? - KRoy
8个回答

13

我建议你在内部使用MemoryStream,但是将其封装在另一个类中,该类存储:

  • MemoryStream
  • 当前的“读取”位置
  • 当前的“消费”位置

然后该类会公开以下内容:

  • 写:将流的位置设置为结尾,写入数据,将流的位置设置回读取位置
  • 读:读取数据,将读取位置设置为流的位置
  • 消费:更新已消费的位置(细节基于您正在尝试消费的方式);如果消费位置超过某个阈值,请将现有的缓冲数据复制到新的MemoryStream中并更新所有变量。(你可能不希望在每个消费请求上都复制缓冲区。)

请注意,如果没有额外的同步,这些操作都不是线程安全的。


1
使用MemoryStream相较于普通的byte[]数组的优点是什么?(好了,算了,我知道了:它可以自动调整大小) - vgru
1
@Groo:MemoryStream具有可扩展的容量(如果需要)。 - Oliver
什么是将现有MemoryStream的剩余部分复制到新MemoryStream的好方法? - Kylotan
@Kylotan:最快的方法可能是调用MemoryStream.GetBuffer获取底层缓冲区,然后使用普通的Write调用将一部分写入新流中。 - Jon Skeet
如果我使用List<byte[]>而不是MemoryStream,会有什么缺点? - Indigo_heart
显示剩余6条评论

12

只需使用一个大的字节数组和 Array.Copy - 就可以搞定了。 如果不行,就使用 List<byte>

如果你使用数组,你需要自己实现一个索引(用于复制额外数据),以及检查内容大小,但这很简单。

如果你感兴趣,这里有一个“循环缓冲区”的简单实现。测试应该会运行(我对它进行了一些单元测试,但没有检查所有关键路径):

public class ReadWriteBuffer
{
    private readonly byte[] _buffer;
    private int _startIndex, _endIndex;

    public ReadWriteBuffer(int capacity)
    {
        _buffer = new byte[capacity];
    }

    public int Count
    {
        get
        {
            if (_endIndex > _startIndex)
                return _endIndex - _startIndex;
            if (_endIndex < _startIndex)
                return (_buffer.Length - _startIndex) + _endIndex;
            return 0;
        }
    }

    public void Write(byte[] data)
    {
        if (Count + data.Length > _buffer.Length)
            throw new Exception("buffer overflow");
        if (_endIndex + data.Length >= _buffer.Length)
        {
            var endLen = _buffer.Length - _endIndex;
            var remainingLen = data.Length - endLen;

            Array.Copy(data, 0, _buffer, _endIndex, endLen);
            Array.Copy(data, endLen, _buffer, 0, remainingLen);
            _endIndex = remainingLen;
        }
        else
        {
            Array.Copy(data, 0, _buffer, _endIndex, data.Length);
            _endIndex += data.Length;
        }
    }

    public byte[] Read(int len, bool keepData = false)
    {
        if (len > Count)
            throw new Exception("not enough data in buffer");
        var result = new byte[len];
        if (_startIndex + len < _buffer.Length)
        {
            Array.Copy(_buffer, _startIndex, result, 0, len);
            if (!keepData)
                _startIndex += len;
            return result;
        }
        else
        {
            var endLen = _buffer.Length - _startIndex;
            var remainingLen = len - endLen;
            Array.Copy(_buffer, _startIndex, result, 0, endLen);
            Array.Copy(_buffer, 0, result, endLen, remainingLen);
            if (!keepData)
                _startIndex = remainingLen;
            return result;
        }
    }

    public byte this[int index]
    {
        get
        {
            if (index >= Count)
                throw new ArgumentOutOfRangeException();
            return _buffer[(_startIndex + index) % _buffer.Length];
        }
    }

    public IEnumerable<byte> Bytes
    {
        get
        {
            for (var i = 0; i < Count; i++)
                yield return _buffer[(_startIndex + i) % _buffer.Length];
        }
    }
}
请注意:该代码在读取时会“消耗”输入 - 如果您不希望这样,请删除"_startIndex = ..."部分(或使重载成为可选参数并进行检查等操作)。

看起来相当不错,唯一的缺点是在决定是否调用Read()时,我确实需要整个缓冲区的连续视图。有没有一种好的方法可以在不进行复制的情况下实现它?也许使用一个可枚举对象来迭代这两部分? - Kylotan
我更新了代码,这样你就可以选择保留读取的数据 - 这样可以吗?(你可以通过 obj.Read(obj.Count, true) 预览所有数据,或者更易读的方式是 obj.Read(obj.Count, keepData: true)) - Random Dev
是的,我知道我可以这样做,但我想避免每次都进行这些复制。这会增加很多额外的分配和释放内存的操作,而我作为一名游戏程序员,需要经常调用这个函数。 - Kylotan
与其在读取时抛出异常,我宁愿返回可用的内容(长度为Count)。在写入时,我会选择OverflowException,但这只是我的个人偏好。 - Lord of Scripts
就像你说的那样:你觉得怎么样都可以 - 但我会把他的行为称为TryRead或ReadAvaiable或其他什么。 - Random Dev
显示剩余5条评论

3
我认为BufferedStream是解决问题的方法。而且可以通过调用Seek跳过未读取的len字节数据。
BufferdStream buffer = new BufferedStream(tcpStream, size); // we have a buffer of size
...
...
while(...)
{
    buffer.Read(...);
    // do my staff
    // I read too much, I want to put back len bytes
    buffer.Seek(-len, SeekOrigin.End);
    // I shall come back and read later
}

内存增长

MemoryStream可以自动增长,与BufferedStream不同的是,BufferedStream需要初始指定size

记住流数据

MemoryStream一直保存所有的数据,而BufferedStream只保存一段流数据。

源流与字节数组

MemoryStream允许使用Write()方法添加输入字节,这些字节可以在未来用Read()方法读取。而BufferedSteam从构造函数中指定的另一个源流中获取输入字节。


1

来晚了,但为了后人记录:

过去我做过这件事,采用了稍微不同的方法。 如果 消息有一个固定的头部大小(告诉您正文有多少字节),并且考虑到网络流已经在缓冲,我将操作分为两个阶段:

  • 从流中读取头部字节
  • 基于头部,随后从流中读取正文字节
  • 重复进行上述两步

这利用了流的特点——当您请求'n'个字节时,您永远不会得到多余的字节,因此您可以忽略许多“哦,我读得太多了,请让我把它们放在下次读取”等问题。

现在这还不是全部,公平地说。我在流上有一个底层包装器类来处理碎片化问题(即如果要求4个字节,直到收到4个字节或流关闭才返回)。但那很容易。

在我看来,关键是将消息处理与流机制解耦,如果停止尝试从流中作为单个ReadBytes()来消耗消息,生活就变得简单得多。

无论您的读取是阻塞还是异步(APM/await),所有这些都是真实的。

我可能会遇到的问题是,一个"如果请求4个字节,则不返回直到收到四个字节"的方法对于我需要运行的这种系统来说并不实用,因为它不能在等待数据时挂起线程。我需要能够暂存部分数据并稍后返回,这需要像其他答案所建议的FIFO缓冲区。 (我的主要问题实际上是“为什么标准库中没有像这样的东西?”) - Kylotan

1

这是我写了一段时间的缓冲区的另一个实现

  • 可调整大小:允许排队数据而不抛出缓冲区溢出异常;
  • 高效:使用单个缓冲区和Buffer.Copy操作来入队/出队数据。

如果它允许任意长度的peek操作,那对我的目的来说将是非常好的。否则,我就很难知道是否可以安全地消耗数据。 - Kylotan

0

听起来你想从套接字中读取数据到一个内存流缓冲区,然后每次遇到特定的字节时,从缓冲区中取出数据并重置它。代码大致如下:

void ReceiveAllMessages(Action<byte[]> messageReceived, Socket socket)
{
    var currentMessage = new MemoryStream();
    var buffer = new byte[128];

    while (true)
    {
        var read = socket.Receive(buffer, 0, buffer.Length);
        if (read == 0)
            break;     // Connection closed

        for (var i = 0; i < read; i++)
        {
            var currentByte = buffer[i];
            if (currentByte == END_OF_MESSAGE)
            {
                var message = currentMessage.ToByteArray();
                messageReceived(message);

                currentMessage = new MemoryStream();
            }
            else
            {
                currentMessage.Write(currentByte);
            }
        }
    }
}

不,我不需要解析方面的帮助,只需要关于实际缓冲的帮助,但还是谢谢。 :) - Kylotan
如果消息被分成几个数据包,这种方法就行不通了。必须按照 OP 的要求使用 FIFO 缓冲区来实现。 - vgru
它会不断地将数据附加到同一个MemoryStream中,直到遇到END_OF_MESSAGE字节为止,这可能出现在第一个或第85个数据包中。 - Paul Stovell
明白了,我没有看到无限循环。但是,您仍然在同一个方法中混合了两个职责:接收和解析数据。如果没有“END_OF_MESSAGE”字节,但每个消息的长度取决于其内容(例如特定的起始cookie,然后是编码在消息内部的长度信息),该怎么办?这个问题必须分两步解决:1.只将字节排队到FIFO缓冲区的类,2.解析数据并出列的类。 - vgru

0

你可以使用一个包装了ConcurrentQueue<ArraySegment<byte>>Stream来实现这个功能(请记住,这使其仅支持向前)。然而,我真的不喜欢在处理数据之前将数据保存在内存中的想法;这会让你面临一堆攻击(有意或无意)关于消息大小。你可能还想Google“循环缓冲区”

实际上,你应该编写代码,尽快对接收到的数据执行有意义的操作:“推送解析”(例如,SAX就支持这种方式)。以下是如何使用文本进行此操作的示例:

private Encoding _encoding;
private Decoder _decoder;
private char[] _charData = new char[4];

public PushTextReader(Encoding encoding)
{
    _encoding = encoding;
    _decoder = _encoding.GetDecoder();
}

// A single connection requires its own decoder
// and charData. That connection should never
// call this method from multiple threads
// simultaneously.
// If you are using the ReadAsyncLoop you
// don't need to worry about it.
public void ReceiveData(ArraySegment<byte> data)
{
    // The two false parameters cause the decoder
    // to accept 'partial' characters.
    var charCount = _decoder.GetCharCount(data.Array, data.Offset, data.Count, false);
    charCount = _decoder.GetChars(data.Array, data.Offset, data.Count, _charData, 0, false);
    OnCharacterData(new ArraySegment<char>(_charData, 0, charCount));
}

如果您必须在反序列化之前能够接受完整的消息,可以使用MemoryMappedFile,它的优点是发送实体无法使您的服务器出现内存不足的情况。问题在于将文件重置为零;因为这可能会引起一系列问题。解决此问题的一种方法是:

TCP接收端

  1. 写入当前流。
  2. 如果流超过一定长度,则移动到新流。

反序列化端

  1. 从当前流中读取。
  2. 一旦清空了流,就销毁它。

TCP接收端非常简单。反序列化端将需要一些基本的缓冲区拼接逻辑(记得使用Buffer.BlockCopy而不是Array.Copy)。

附注:听起来像一个有趣的项目,如果我有时间并且记得,我可能会实现这个系统。


Jonathan,我不明白解析部分消息如何解决你提到的安全问题,因为解析后的对象很可能比原始字节更大 - 这样攻击实际上会变得更有效率。就目前而言,我的消息已经有一个大小限制了,并且大小在头部中,因此无效的消息可以早期检测出来。 - Kylotan
@Kylotan - 向服务器发送昂贵的消息同样有效,即使您限制了可以发送的消息的总大小,也可以发送需要执行20秒的消息,然后排队一堆其他消息:无论是有意还是无意。 - Jonathan Dickinson
当然可以,但我没有看到任何内在的原因,解析这些消息一块一块地会更便宜。 - Kylotan
这正在变成一场讨论。如果你没有戴上安全帽,那么不去做也没关系。 - Jonathan Dickinson
Buffer.BlockCopy 所谓的速度优势已经被 揭穿 - Edward Brey
显示剩余3条评论

0

这里只有三个回答提供了代码。 其中一个笨拙,其他的都没有回答问题。

这是一个你可以直接复制粘贴的类:

/// <summary>
/// This class is a very fast and threadsafe FIFO buffer
/// </summary>
public class FastFifo
{
    private List<Byte> mi_FifoData = new List<Byte>();

    /// <summary>
    /// Get the count of bytes in the Fifo buffer
    /// </summary>
    public int Count
    {
        get 
        { 
            lock (mi_FifoData)
            {
                return mi_FifoData.Count; 
            }
        }
    }

    /// <summary>
    /// Clears the Fifo buffer
    /// </summary>
    public void Clear()
    {
        lock (mi_FifoData)
        {
            mi_FifoData.Clear();
        }
    }

    /// <summary>
    /// Append data to the end of the fifo
    /// </summary>
    public void Push(Byte[] u8_Data)
    {
        lock (mi_FifoData)
        {
            // Internally the .NET framework uses Array.Copy() which is extremely fast
            mi_FifoData.AddRange(u8_Data);
        }
    }

    /// <summary>
    /// Get data from the beginning of the fifo.
    /// returns null if s32_Count bytes are not yet available.
    /// </summary>
    public Byte[] Pop(int s32_Count)
    {
        lock (mi_FifoData)
        {
            if (mi_FifoData.Count < s32_Count)
                return null;

            // Internally the .NET framework uses Array.Copy() which is extremely fast
            Byte[] u8_PopData = new Byte[s32_Count];
            mi_FifoData.CopyTo(0, u8_PopData, 0, s32_Count);
            mi_FifoData.RemoveRange(0, s32_Count);
            return u8_PopData;
        }
    }

    /// <summary>
    /// Gets a byte without removing it from the Fifo buffer
    /// returns -1 if the index is invalid
    /// </summary>
    public int PeekAt(int s32_Index)
    {
        lock (mi_FifoData)
        {
            if (s32_Index < 0 || s32_Index >= mi_FifoData.Count)
                return -1;

            return mi_FifoData[s32_Index];
        }
    }
}

仅供记录,此实现将所有写入它的字节打包成对象,以便可以将它们添加到集合mi_FifoData中。考虑到这一点,如果不是致命的话,这个类的性能和内存占用将会非常糟糕。 - founderio

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接