使用 int NetworkStream.Read(Span<Bytes>) 接收完整的网络数据流

4

正如标题所说,我正在尝试在我的网络项目中使用新的 (C# 8.0) 对象(Span)。在以前的实现中,我学到了在尝试使用其内容之前,必须确保 NetworkStream 已接收到完整的缓冲区,否则依赖于连接的状态,接收到的数据可能不完整。

while (true)
{
  while (!stream.DataAvailable)
  Thread.Sleep(10);

  int received = 0;
  byte[] response = new byte[consumerBufferSize];

  //Loop that forces the stream to read all incoming data before using it
  while (received < consumerBufferSize) 
    received += stream.Read(response, received, consumerBufferSize - received);

  string[] message = ObjectWrapper.ConvertByteArrayToObject<string>(response);
  consumerAction(this, message);
}

然而,现在引入了一种不同的读取网络流数据的方法(Read(Span))。并且假设 stackalloc 能够提高性能,我正在尝试将我的旧实现迁移到这种方法。现在它的样子如下:

while (true)
{
  while (!stream.DataAvailable)
    Thread.Sleep(10);

  Span<byte> response = stackalloc byte[consumerBufferSize];

  stream.Read(response);

  string[] message = ObjectWrapper.ConvertByteArrayToObject<string>(response).Split('|');
  consumerAction(this, message);
}

但是现在,既然它不提供我之前使用的方法,那么我如何确保缓冲区已经完全读取呢?

编辑:

//Former methodd
int Read (byte[] buffer, int offset, int size);
//The one I am looking for
int Read (Span<byte> buffer, int offset, int size);

请尝试使用 Span<T>.Length - aepot
由于没有像Read(byte [] buffer,int offset,int size)这样的重载功能,它如何帮助我? - Carlos Henrique
抱歉,我不明白你的意思。它究竟如何有助于解决问题?你能给我提供一个例子吗? - Carlos Henrique
Span 不是一个数组,它是数组的一个范围,已经包含了有关范围偏移量和长度的信息。请查看此构造函数。因此,您可以将其传递给 ctor 而不是 Read 方法。 - aepot
1
谢谢您的反馈。我认为Peter给了我一些有用的见解,让我更明白您的意思。 - Carlos Henrique
2个回答

3
我不确定我理解你的问题。在使用 Span<byte> 时,第一个代码示例中依赖的所有相同特性仍然存在。 Read(Span<byte>) 重载仍然返回读取的字节数。由于 Span<byte> 不是缓冲区本身,而只是缓冲区的窗口,因此您可以更新 Span<byte> 的值以指示读取其他数据的新起始点。拥有读取的字节数并能够指定下一次读取的偏移量,这些就足以复制旧示例中的功能了。当然,您当前没有保存原始缓冲区引用的任何代码;您还需要添加它。
我预计像这样的代码可以正常工作:
while (true)
{
  while (!stream.DataAvailable)
    Thread.Sleep(10);

  byte* response = stackalloc byte[consumerBufferSize];

  while (received < consumerBufferSize) 
  {
    Span<byte> span = new Span<byte>(response, received, consumerBufferSize - received);

    received += stream.Read(span);
  }

  // process response here...
}

请注意,由于stackalloc的工作原理,这需要使用unsafe代码。您只能通过每次使用Span<T>并分配新块来避免这种情况。当然,这最终将耗尽所有堆栈。
既然在您的实现中显然要将一个线程用于此无限循环,我不认为stackalloc有什么帮助。你可以一样在堆中分配一个长期存在的缓冲数组来使用。
换句话说,我真的不明白这比使用常规托管数组的原始 Read(byte[], int, int) 重载要好在哪里。但上面是让代码正常工作的方法。
顺便说一下: 你应该学习异步API的工作方式。既然已经使用了NetworkStreamasync/await模式是自然匹配的。而且无论您使用哪个API,检查DataAvailable的循环都很糟糕。别那么做了。由于Read()方法已经是一个阻塞方法,您不需要等待数据在一个单独的循环中显示,因为Read()方法不会返回,直到有数据出现。

2
我只是添加了一些额外的信息。
你所谈论的函数具有以下描述。
public override int Read (Span<byte> buffer);

源文翻译如下:

(来源:https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.networkstream.read?view=net-5.0

返回的 int 是从 NetworkStream 读取的字节数。现在,如果我们查看 Span 函数,我们会发现 Slice 并具有以下描述:

public Span<T> Slice (int start);

这段文本的意思是:
返回我们 Span 的一部分,您可以使用它将您的 stackalloc 的某个部分发送到 NetworkStream 中,而无需使用不安全的代码。
重复使用您的代码,您可以像这样使用:
while (true)
{
    while (!stream.DataAvailable)
        Thread.Sleep(10);

        int received = 0;
        Span<byte> response = stackalloc byte[consumerBufferSize];

        //Loop that forces the stream to read all incoming data before using it
        while (received < consumerBufferSize)
            received += stream.Read(response.Slice(received));

        string[] message = ObjectWrapper.ConvertByteArrayToObject<string>(response).Split('|');
        consumerAction(this, message);
}

简单来说,我们使用Slice创建一个指向我们的stackalloc的初始Span的一部分的新Span,"start"参数允许我们选择从哪里开始这个部分。然后将该部分传递给read函数,它将从我们“开始”Slice的地方开始在我们的缓冲区中写入数据。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接