TcpClient.GetStream().DataAvailable返回false,但流中仍有更多数据。

12
因此,似乎阻塞式Read()在接收所有发送给它的数据之前就返回了。因此我们使用一个由所涉及的流中的DataAvailable值控制的循环来包装Read()。问题在于,在这个while循环中,您可能会收到更多的数据,但是没有任何后台处理让系统知道这一点。我在网上找到的大多数解决方案都无法适用于我的情况。
最终我采取的做法是,在每次从流中读取块后,在我的循环的最后一步中执行简单的Thread.Sleep(1)。这似乎给系统留出了时间更新,我得到了准确的结果,但这似乎有点不正当,并且对于解决方案来说有相当多的“特殊情况”。
下面是我正在处理的情况列表:单个TCP连接,连接了一个IIS应用程序和一个单独的应用程序,两者均以C#编写,用于发送/接收通信。它发送请求然后等待响应。这个请求是由HTTP请求发起的,但我在从HTTP请求中读取数据后没有遇到这个问题,而是在之后。
以下是处理传入连接的基本代码。
protected void OnClientCommunication(TcpClient oClient)
{
    NetworkStream stream = oClient.GetStream();
    MemoryStream msIn = new MemoryStream();

    byte[] aMessage = new byte[4096];
    int iBytesRead = 0;

    while ( stream.DataAvailable )
    {
        int iRead = stream.Read(aMessage, 0, aMessage.Length);
        iBytesRead += iRead;
        msIn.Write(aMessage, 0, iRead);
        Thread.Sleep(1);
    }
    MemoryStream msOut = new MemoryStream();

    // .. Do some processing adding data to the msOut stream

    msOut.WriteTo(stream);
    stream.Flush();

    oClient.Close();
}

欢迎提供任何反馈,以获得更好的解决方案,或者简单点赞确认需要在检查DataAvailable值之前进行Sleep(1)操作,以确保正确更新。

我想猜测,经过两年后,希望这个问题的答案不是仍然如此 :)


你最终实现了什么?你还是使用了线程休眠吗?我有一个类似的问题:http://stackoverflow.com/questions/36731247/c-sharp-networkstream-dataavailable-seems-to-be-unreliable - joelc
1
是的,这就是这些库的工作方式。它们需要被给予时间来完全验证传入的数据。 - James
8个回答

12

你需要知道需要读取多少数据。不能简单地循环读取数据,直到没有更多数据为止,因为你无法确定是否还会有更多数据到来。

这就是为什么HTTP GET结果在HTTP头中具有字节计数的原因:以便客户端在接收完所有数据时知道。

根据你是否控制另一方发送的内容,以下是两种解决方案:

  1. 使用“帧”字符:(SB)data(EB),其中SB和EB是开始块和结束块字符(由你选择),但它们不能出现在数据内部。当你“看到”EB时,就知道你完成了。

  2. 在每个消息前实现一个长度字段,表示后面有多少数据:(len)data。先读取(len),然后读取(len)个字节;如有必要,重复该过程。

这不像从文件中读取数据那样,其中零长度读取意味着数据结束(这确实意味着另一方已断开连接,但这是另一件事情)。

第三种(不推荐)解决方案是实现计时器。一旦开始接收数据,请设置计时器。如果接收循环闲置了一段时间(如果数据不经常到达,则为几秒钟),则可以认为不再有更多数据。这种方法是最后的手段……它不太可靠,难以调整,而且很脆弱。


1
这与我们在引发这个问题的产品中所采用的方案非常接近。只是由于C#和托管库非常简单易用,我希望它也能提供一个漂亮的界面。我的意思是,如果只有一种方法可以做事情,为什么不提供一个API来完成呢? - James

11

我看到了一个问题。
你期望通信速度比while()循环快,这是非常不可能的。
只要没有更多数据,while()循环就会结束,而在它退出几毫秒后,这种情况可能并不成立。

你期望有一定量的字节吗?
OnClientCommunication()被触发的频率是多少?由谁触发?

while()循环之后你会怎么处理数据?会继续将其附加到以前的数据上吗?

DataAvailable 返回false,因为你读取数据的速度比通信速度快,所以只有当你不断回到这个代码块来处理进来的更多数据时才可以正常工作。


你提到的问题正是我的问题,我正在寻找比while循环中的Thread.Sleep(1)更好的解决方案 :) 我不希望设置一定量的数据,超过40KB就足以导致它跳过其余的数据。OnClientCommunication经常被调用,并且在执行时由IIS Web应用程序调用。这旨在成为一次性交易,读取所有数据,处理它们,返回结果。 - James
好的,已经过去了一年和1000次浏览,没有更新,所以我只是会接受这个答案。 - James
1
@James,这不是接受答案的好理由,对于需要/使用此信息的其他人来说有点欺骗性。 - Jeff LaFay
@jlafay 对不起,但实际上答案就是事情是这样的。你需要在while循环中使用sleep()函数,这样数据缓冲区才能像原始问题中所示一样被填充。如果有任何更改,那么可以选择新的答案,但目前情况就是这样。 - James

2
我试图在从网络流中读取数据之前检查DataAvailable,但它会返回false,尽管读取一个字节后它会返回true。因此,我查看了MSDN文档,并且他们在检查之前也进行了读取。我会将while循环重新排列为do while循环以遵循这个模式。

http://msdn.microsoft.com/en-us/library/system.net.sockets.networkstream.dataavailable.aspx

        // Check to see if this NetworkStream is readable. 
        if(myNetworkStream.CanRead){
            byte[] myReadBuffer = new byte[1024];
            StringBuilder myCompleteMessage = new StringBuilder();
            int numberOfBytesRead = 0;

            // Incoming message may be larger than the buffer size. 
            do{
                 numberOfBytesRead = myNetworkStream.Read(myReadBuffer, 0, myReadBuffer.Length);

                 myCompleteMessage.AppendFormat("{0}", Encoding.ASCII.GetString(myReadBuffer, 0, numberOfBytesRead));

            }
            while(myNetworkStream.DataAvailable);

            // Print out the received message to the console.
            Console.WriteLine("You received the following message : " +
                                         myCompleteMessage);
        }
        else{
             Console.WriteLine("Sorry.  You cannot read from this NetworkStream.");
        }

我遇到了与原问题描述相同的问题。但是根据@Despertar的建议,我认为检查CanRead或CanWrite是无用的,因为这两个属性在NetworkStream创建时设置为默认值True,这意味着只要创建NetworkStream,它们就始终为真。 - pixel

2
当我有以下代码时:
    var readBuffer = new byte[1024];
    using (var memoryStream = new MemoryStream())
    {
        do
        {
            int numberOfBytesRead = networkStream.Read(readBuffer, 0, readBuffer.Length);
            memoryStream.Write(readBuffer, 0, numberOfBytesRead);
        }
        while (networkStream.DataAvailable);
    }

根据我的观察:

  • 当发送者发送1000个字节并且读取器想要读取它们时,我怀疑NetworkStream“知道”它应该接收1000个字节。
  • 当我在NetworkStream中没有任何数据到达之前调用.Read时,.Read应该阻塞直到获取超过0个字节(如果networkStream上的.NoDelay为false,则获取更多字节)
  • 然后,当我读取第一批数据时,我怀疑.Read会从其结果中更新这1000个字节的计数器,并且在此之前,我怀疑.DataAvailable设置为false。在更新计数器之前,如果计数器数据小于1000个字节,则.DataAvailable将设置为正确的值。如果仔细思考,这是有意义的。否则,在检查到1000个字节到达之前,它会进入下一个循环,并且.Read方法将无限期地阻塞,因为读取器可能已经读取了1000个字节,而不会再有更多的数据到达。
  • 这就是失败的关键点,正如James所说:

是的,这只是这些库运行的方式。他们需要被给予时间来完全验证传入的数据。- James Apr 20 '16 at 5:24

  • 我怀疑在.Read结束之后访问.DataAvailable之前的内部计数器更新不是原子操作(事务),因此TcpClient需要更多时间来正确设置DataAvailable。

当我有这段代码时:

    var readBuffer = new byte[1024];
    using (var memoryStream = new MemoryStream())
    {
        do
        {
            int numberOfBytesRead = networkStream.Read(readBuffer, 0, readBuffer.Length);
            memoryStream.Write(readBuffer, 0, numberOfBytesRead);

            if (!networkStream.DataAvailable)
                System.Threading.Thread.Sleep(1); //Or 50 for non-believers ;)
        }
        while (networkStream.DataAvailable);
    }

然后,NetworkStream有足够的时间来正确设置.DataAvailable,这个方法应该可以正常工作。

有趣的事实是...这似乎与操作系统版本有关。因为第一个没有sleep的函数在Win XP和Win 10上可以工作,但在Win 7上无法接收到全部1000字节。别问我为什么,但我进行了充分的测试,很容易重现这个问题。


2
这很可能是由于操作系统及其线程/进程执行与套接字扫描例程的性质所致。绝对不要依赖它。深入了解网络通常会让你想知道为什么系统在过去几十年中没有太多进展。 - James

0
使用do-while循环。这将确保内存流指针已移动。第一个Read或ReadAsync将导致内存流指针移动,然后“ .DataAvailable”属性将继续返回true,直到我们到达流的末尾。
来自Microsoft文档的示例:
//检查此NetworkStream是否可读。 if(myNetworkStream.CanRead){ byte[] myReadBuffer = new byte[1024]; StringBuilder myCompleteMessage = new StringBuilder(); int numberOfBytesRead = 0;
//传入的消息可能比缓冲区大小大。 do{ numberOfBytesRead = myNetworkStream.Read(myReadBuffer, 0, myReadBuffer.Length);
myCompleteMessage.AppendFormat("{0}", Encoding.ASCII.GetString(myReadBuffer, 0, numberOfBytesRead)); } while(myNetworkStream.DataAvailable);
//将接收到的消息打印到控制台。 Console.WriteLine("You received the following message : " + myCompleteMessage); } else{ Console.WriteLine("Sorry. You cannot read from this NetworkStream."); } 原始Microsoft文档

0
public class NetworkStream
{
    private readonly Socket m_Socket;

    public NetworkStream(Socket socket)
    {
        m_Socket = socket ?? throw new ArgumentNullException(nameof(socket));
    }

    public void Send(string message)
    {
        if (message is null)
        {
            throw new ArgumentNullException(nameof(message));
        }

        byte[] data = Encoding.UTF8.GetBytes(message);
        SendInternal(data);
    }

    public string Receive()
    {
        byte[] buffer = ReceiveInternal();
        string message = Encoding.UTF8.GetString(buffer);
        return message;
    }

    private void SendInternal(byte[] message)
    {
        int size = message.Length;

        if (size == 0)
        {
            m_Socket.Send(BitConverter.GetBytes(size), 0, sizeof(int), SocketFlags.None);
        }
        else
        {
            m_Socket.Send(BitConverter.GetBytes(size), 0, sizeof(int), SocketFlags.None);
            m_Socket.Send(message, 0, size, SocketFlags.None);
        }
    }

    private byte[] ReceiveInternal()
    {
        byte[] sizeData = CommonReceiveMessage(sizeof(int));
        int size = BitConverter.ToInt32(sizeData);

        if (size == 0)
        {
            return Array.Empty<byte>();
        }

        return CommonReceiveMessage(size);
    }

    private byte[] CommonReceiveMessage(int messageLength)
    {
        if (messageLength < 0)
        {
            throw new ArgumentOutOfRangeException(nameof(messageLength), messageLength, "Размер сообщения не может быть меньше нуля.");
        }

        if (messageLength == 0)
        {
            return Array.Empty<byte>();
        }

        byte[] buffer = new byte[m_Socket.ReceiveBufferSize];
        int currentLength = 0;
        int receivedDataLength;

        using (MemoryStream memoryStream = new())
        {
            do
            {
                receivedDataLength = m_Socket.Receive(buffer, 0, m_Socket.ReceiveBufferSize, SocketFlags.None);
                currentLength += receivedDataLength;
                memoryStream.Write(buffer, 0, receivedDataLength);
            }
            while (currentLength < messageLength);

            return memoryStream.ToArray();
        }
    }
}

这个答案缺少一些代码解释,但是看起来接收部分读取前4个字节以确定后面将跟随多少字节。一旦接收到这些字节数,接收就被认为是完成的。因此,您需要在某种程度上达成共识,以确定应该期望多少字节,以确定接收到的消息是否完整。 - Lies

0

这个例子介绍了一个发送和接收数据的算法,即文本消息。您也可以发送文件。

using System;
using System.IO;
using System.Net.Sockets;
using System.Text;

namespace Network
{
    /// <summary>
    /// Represents a network stream for transferring data.
    /// </summary>
    public class NetworkStream
    {
        #region Fields
        private static readonly byte[] EmptyArray = Array.Empty<byte>();
        private readonly Socket m_Socket;
        #endregion

        #region Constructors
        /// <summary>
        /// Initializes a new instance of the class <seealso cref="NetworkStream"/>.
        /// </summary>
        /// <param name="socket">
        /// Berkeley socket interface.
        /// </param>
        public NetworkStream(Socket socket)
        {
            m_Socket = socket ?? throw new ArgumentNullException(nameof(socket));
        }
        #endregion

        #region Properties

        #endregion

        #region Methods
        /// <summary>
        /// Sends a message.
        /// </summary>
        /// <param name="message">
        /// Message text.
        /// </param>
        /// <exception cref="ArgumentNullException"/>
        public void Send(string message)
        {
            if (message is null)
            {
                throw new ArgumentNullException(nameof(message));
            }

            byte[] data = Encoding.UTF8.GetBytes(message);
            Write(data);
        }

        /// <summary>
        /// Receives the sent message.
        /// </summary>
        /// <returns>
        /// Sent message.
        /// </returns>
        public string Receive()
        {
            byte[] data = Read();
            return Encoding.UTF8.GetString(data);
        }

        /// <summary>
        /// Receives the specified number of bytes from a bound <seealso cref="Socket"/>.
        /// </summary>
        /// <param name="socket">
        /// <seealso cref="Socket"/> for receiving data.
        /// </param>
        /// <param name="size">
        /// The size of the received data.
        /// </param>
        /// <returns>
        /// Returns an array of received data.
        /// </returns>
        private byte[] Read(int size)
        {
            if (size < 0)
            {
                // You can throw an exception.
                return null;
            }

            if (size == 0)
            {
                // Don't throw an exception here, just return an empty data array.
                return EmptyArray;
            }

            // There are many examples on the Internet where the
            // Socket.Available property is used, this is WRONG!

            // Important! The Socket.Available property is not working as expected.
            // Data packages may be in transit, but the Socket.Available property may indicate otherwise.
            // Therefore, we use a counter that will allow us to receive all data packets, no more and no less.
            // The cycle will continue until we receive all the data packets or the timeout is triggered.

            // Note. This algorithm is not designed to work with big data.

            SimpleCounter counter = new(size, m_Socket.ReceiveBufferSize);
            byte[] buffer = new byte[counter.BufferSize];
            int received;

            using MemoryStream storage = new();

            // The cycle will run until we get all the data.
            while (counter.IsExpected)
            {
                received = m_Socket.Receive(buffer, 0, counter.Available, SocketFlags.None);
                // Pass the size of the received data to the counter.
                counter.Count(received);
                // Write data to memory.
                storage.Write(buffer, 0, received);
            }

            return storage.ToArray();
        }

        /// <summary>
        /// Receives the specified number of bytes from a bound <seealso cref="Socket"/>.
        /// </summary>
        /// <returns>
        /// Returns an array of received data.
        /// </returns>
        private byte[] Read()
        {
            byte[] sizeData;
            // First, we get the size of the master data.
            sizeData = Read(sizeof(int));
            // We convert the received data into a number.
            int size = BitConverter.ToInt32(sizeData);

            // If the data size is less than 0 then throws an exception.
            // We inform the recipient that an error occurred while reading the data.

            if (size < 0)
            {
                // Or return the value null.
                throw new SocketException();
            }

            // If the data size is 0, then we will return an empty array.
            // Do not allow an exception here.

            if (size == 0)
            {
                return EmptyArray;
            }

            // Here we read the master data.
            byte[] data = Read(size);
            return data;
        }

        /// <summary>
        /// Writes data to the stream.
        /// </summary>
        /// <param name="data"></param>
        private void Write(byte[] data)
        {
            if (data is null)
            {
                // Throw an exception.
                // Or send a negative number that will represent the value null.
                throw new ArgumentNullException(nameof(data));
            }

            byte[] sizeData = BitConverter.GetBytes(data.Length);

            // In any case, we inform the recipient about the size of the data.
            m_Socket.Send(sizeData, 0, sizeof(int), SocketFlags.None);

            if (data.Length != 0)
            {
                // We send data whose size is greater than zero.
                m_Socket.Send(data, 0, data.Length, SocketFlags.None);
            }
        }
        #endregion

        #region Classes
        /// <summary>
        /// Represents a simple counter of received data over the network.
        /// </summary>
        private class SimpleCounter
        {
            #region Fields
            private int m_Received;
            private int m_Available;
            private bool m_IsExpected;
            #endregion

            #region Constructors
            /// <summary>
            /// Initializes a new instance of the class <seealso cref="SimpleCounter"/>.
            /// </summary>
            /// <param name="dataSize">
            /// Data size.
            /// </param>
            /// <param name="bufferSize">
            /// Buffer size.
            /// </param>
            /// <exception cref="ArgumentOutOfRangeException"/>
            public SimpleCounter(int dataSize, int bufferSize)
            {
                if (dataSize < 0)
                {
                    throw new ArgumentOutOfRangeException(nameof(dataSize), dataSize, "Data size cannot be less than 0");
                }

                if (bufferSize < 0)
                {
                    throw new ArgumentOutOfRangeException(nameof(dataSize), bufferSize, "Buffer size cannot be less than 0");
                }

                DataSize = dataSize;
                BufferSize = bufferSize;

                // Update the counter data.
                UpdateCounter();
            }
            #endregion

            #region Properties
            /// <summary>
            /// Returns the size of the expected data.
            /// </summary>
            /// <value>
            /// Size of expected data.
            /// </value>
            public int DataSize { get; }

            /// <summary>
            /// Returns the size of the buffer.
            /// </summary>
            /// <value>
            /// Buffer size.
            /// </value>
            public int BufferSize { get; }

            /// <summary>
            /// Returns the available buffer size for receiving data.
            /// </summary>
            /// <value>
            /// Available buffer size.
            /// </value>
            public int Available
            {
                get
                {
                    return m_Available;
                }
            }

            /// <summary>
            /// Returns a value indicating whether the thread should wait for data.
            /// </summary>
            /// <value>
            /// <see langword="true"/> if the stream is waiting for data; otherwise, <see langword="false"/>.
            /// </value>
            public bool IsExpected
            {
                get
                {
                    return m_IsExpected;
                }
            }
            #endregion

            #region Methods
            // Updates the counter.
            private void UpdateCounter()
            {
                int unreadDataSize = DataSize - m_Received;
                m_Available = unreadDataSize < BufferSize ? unreadDataSize : BufferSize;
                m_IsExpected = m_Available > 0;
            }

            /// <summary>
            /// Specifies the size of the received data.
            /// </summary>
            /// <param name="bytes">
            /// The size of the received data.
            /// </param>
            public void Count(int bytes)
            {
                // NOTE: Counter cannot decrease.

                if (bytes > 0)
                {
                    int received = m_Received += bytes;
                    // NOTE: The value of the received data cannot exceed the size of the expected data.
                    m_Received = (received < DataSize) ? received : DataSize;

                    // Update the counter data.
                    UpdateCounter();
                }
            }

            /// <summary>
            /// Resets counter data.
            /// </summary>
            public void Reset()
            {
                m_Received = 0;
                UpdateCounter();
            }
            #endregion
        }
        #endregion
    }
}

0
使用TcpClient.Available将允许此代码每次读取可用的数据。当剩余要读取的数据量大于或等于TcpClient.ReceiveBufferSize时,TcpClient.Available会自动设置为TcpClient.ReceiveBufferSize。否则,它将设置为剩余数据的大小。 因此,您可以通过设置TcpClient.ReceiveBufferSize(例如,oClient.ReceiveBufferSize = 4096;)来指示每次读取的最大数据量。
        protected void OnClientCommunication(TcpClient oClient)
        {
            NetworkStream stream = oClient.GetStream();
            MemoryStream msIn = new MemoryStream();

            byte[] aMessage;
            oClient.ReceiveBufferSize = 4096;
            int iBytesRead = 0;

            while (stream.DataAvailable)
            {
                int myBufferSize = (oClient.Available < 1) ? 1 : oClient.Available;
                aMessage = new byte[oClient.Available];

                int iRead = stream.Read(aMessage, 0, aMessage.Length);
                iBytesRead += iRead;
                msIn.Write(aMessage, 0, iRead);
            }
            MemoryStream msOut = new MemoryStream();

            // .. Do some processing adding data to the msOut stream

            msOut.WriteTo(stream);
            stream.Flush();

            oClient.Close();
        }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接