C#: 实现 NetworkStream.Peek?

18

目前在C#中没有NetworkStream.Peek方法。那么如何实现一个类似于NetworkStream.ReadByte的Peek方法,但返回的byte并不会从Stream中移除?


这是一个很好的问题,但只是因为我很好奇:你为什么想要这样做呢? - Justin R.
2
我需要检查NetworkStream中的字节,以获取决定使用哪种方法/类来处理流的信息,这就是为什么我需要Peek功能。 - Lopper
1
我告诉你为什么我想要这个:如果你想通过SslStream连接客户端和服务器,SslStream.AuthenticateAsClient会在网络上传送客户端hello。而SslStream.AuthenticateAsServer则需要在其参数中指定服务器证书。如果你想支持SNI,则需要先查看NetworkStream字节以确定所请求的服务器名称,然后再在服务器上实例化SslStream类。 - Edward Ned Harvey
5个回答

11
我遇到了与“查找幻数,然后决定将流发送到哪个流处理器”的相同要求,但不幸的是,我无法通过将已经消耗的字节作为单独参数传递到流处理方法中来摆脱这个问题,因为那些方法已经存在,并且它们只接受System.IO.Stream而不接受其他任何内容,正如在Aaronaught的答案评论中建议的那样。
我通过创建一个更或多或少通用的PeekableStream类来解决这个问题,该类包装了一个Stream。它适用于NetworkStreams,但对于任何其他Stream也适用,只要您可以Stream.CanRead它。

编辑

或者,您可以使用全新的ReadSeekableStream并执行以下操作:

var readSeekableStream = new ReadSeekableStream(networkStream, /* >= */ count);
...
readSeekableStream.Read(..., count);
readSeekableStream.Seek(-count, SeekOrigin.Current);

无论如何,这里来了一个PeekableStream

/// <summary>
/// PeekableStream wraps a Stream and can be used to peek ahead in the underlying stream,
/// without consuming the bytes. In other words, doing Peek() will allow you to look ahead in the stream,
/// but it won't affect the result of subsequent Read() calls.
/// 
/// This is sometimes necessary, e.g. for peeking at the magic number of a stream of bytes and decide which
/// stream processor to hand over the stream.
/// </summary>
public class PeekableStream : Stream
{
    private readonly Stream underlyingStream;
    private readonly byte[] lookAheadBuffer;

    private int lookAheadIndex;

    public PeekableStream(Stream underlyingStream, int maxPeekBytes)
    {
        this.underlyingStream = underlyingStream;
        lookAheadBuffer = new byte[maxPeekBytes];
    }

    protected override void Dispose(bool disposing)
    {
        if (disposing)
            underlyingStream.Dispose();

        base.Dispose(disposing);
    }

    /// <summary>
    /// Peeks at a maximum of count bytes, or less if the stream ends before that number of bytes can be read.
    /// 
    /// Calls to this method do not influence subsequent calls to Read() and Peek().
    /// 
    /// Please note that this method will always peek count bytes unless the end of the stream is reached before that - in contrast to the Read()
    /// method, which might read less than count bytes, even though the end of the stream has not been reached.
    /// </summary>
    /// <param name="buffer">An array of bytes. When this method returns, the buffer contains the specified byte array with the values between offset and
    /// (offset + number-of-peeked-bytes - 1) replaced by the bytes peeked from the current source.</param>
    /// <param name="offset">The zero-based byte offset in buffer at which to begin storing the data peeked from the current stream.</param>
    /// <param name="count">The maximum number of bytes to be peeked from the current stream.</param>
    /// <returns>The total number of bytes peeked into the buffer. If it is less than the number of bytes requested then the end of the stream has been reached.</returns>
    public virtual int Peek(byte[] buffer, int offset, int count)
    {
        if (count > lookAheadBuffer.Length)
            throw new ArgumentOutOfRangeException("count", "must be smaller than peekable size, which is " + lookAheadBuffer.Length);

        while (lookAheadIndex < count)
        {
            int bytesRead = underlyingStream.Read(lookAheadBuffer, lookAheadIndex, count - lookAheadIndex);

            if (bytesRead == 0) // end of stream reached
                break;

            lookAheadIndex += bytesRead;
        }

        int peeked = Math.Min(count, lookAheadIndex);
        Array.Copy(lookAheadBuffer, 0, buffer, offset, peeked);
        return peeked;
    }

    public override bool CanRead { get { return true; } }

    public override long Position
    {
        get
        {
            return underlyingStream.Position - lookAheadIndex;
        }
        set
        {
            underlyingStream.Position = value;
            lookAheadIndex = 0; // this needs to be done AFTER the call to underlyingStream.Position, as that might throw NotSupportedException, 
                                // in which case we don't want to change the lookAhead status
        }
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        int bytesTakenFromLookAheadBuffer = 0;
        if (count > 0 && lookAheadIndex > 0)
        {
            bytesTakenFromLookAheadBuffer = Math.Min(count, lookAheadIndex);
            Array.Copy(lookAheadBuffer, 0, buffer, offset, bytesTakenFromLookAheadBuffer);
            count -= bytesTakenFromLookAheadBuffer;
            offset += bytesTakenFromLookAheadBuffer;
            lookAheadIndex -= bytesTakenFromLookAheadBuffer;
            if (lookAheadIndex > 0) // move remaining bytes in lookAheadBuffer to front
                // copying into same array should be fine, according to http://msdn.microsoft.com/en-us/library/z50k9bft(v=VS.90).aspx :
                // "If sourceArray and destinationArray overlap, this method behaves as if the original values of sourceArray were preserved
                // in a temporary location before destinationArray is overwritten."
                Array.Copy(lookAheadBuffer, lookAheadBuffer.Length - bytesTakenFromLookAheadBuffer + 1, lookAheadBuffer, 0, lookAheadIndex);
        }

        return count > 0
            ? bytesTakenFromLookAheadBuffer + underlyingStream.Read(buffer, offset, count)
            : bytesTakenFromLookAheadBuffer;
    }

    public override int ReadByte()
    {
        if (lookAheadIndex > 0)
        {
            lookAheadIndex--;
            byte firstByte = lookAheadBuffer[0];
            if (lookAheadIndex > 0) // move remaining bytes in lookAheadBuffer to front
                Array.Copy(lookAheadBuffer, 1, lookAheadBuffer, 0, lookAheadIndex);
            return firstByte;
        }
        else
        {
            return underlyingStream.ReadByte();
        }
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        long ret = underlyingStream.Seek(offset, origin);
        lookAheadIndex = 0; // this needs to be done AFTER the call to underlyingStream.Seek(), as that might throw NotSupportedException,
                            // in which case we don't want to change the lookAhead status
        return ret;
    }

    // from here on, only simple delegations to underlyingStream

    public override bool CanSeek { get { return underlyingStream.CanSeek; } }
    public override bool CanWrite { get { return underlyingStream.CanWrite; } }
    public override bool CanTimeout { get { return underlyingStream.CanTimeout; } }
    public override int ReadTimeout { get { return underlyingStream.ReadTimeout; } set { underlyingStream.ReadTimeout = value; } }
    public override int WriteTimeout { get { return underlyingStream.WriteTimeout; } set { underlyingStream.WriteTimeout = value; } }
    public override void Flush() { underlyingStream.Flush(); }
    public override long Length { get { return underlyingStream.Length; } }
    public override void SetLength(long value) { underlyingStream.SetLength(value); }
    public override void Write(byte[] buffer, int offset, int count) { underlyingStream.Write(buffer, offset, count); }
    public override void WriteByte(byte value) { underlyingStream.WriteByte(value); }
}

1
@Thomas,你看过源代码了吗?在实现Peek()时,我没有使用Seek(),我唯一使用Seek()的地方就是让PeekableStream的用户能够委托给底层流。 - Evgeniy Berezovsky
我的错误,我误解了你的代码在做什么... 这是一个很好的解决方案,+1! - Thomas Levesque
1
谢谢!有没有可能添加一个ReadAsync方法? - deerchao
1
我认为在使用SeekOrigin.Current调用Seek方法的委托时可能存在一个错误,应该有一行代码像这样:if (origin == SeekOrigin.Current) offset -= lookAheadIndex; - Steve

9
如果您不需要实际检索字节,则可以引用DataAvailable属性。
否则,您可以使用StreamReader包装它并调用其Peek方法。
请注意,这两者都不是从网络流中读取特别可靠的,由于延迟问题。数据可能在您查看之后的瞬间变为可用(存在于读取缓冲区中)。
我不确定您打算用它做什么,但是NetworkStream上的Read方法是阻塞调用,因此即使您以块接收,也不需要检查状态。如果您正在尝试在从流中读取时保持应用程序响应性,则应使用线程或异步调用来接收数据。
编辑:根据此帖StreamReader.PeekNetworkStream上存在缺陷,或者至少具有未记录的行为,因此如果您选择这种方法,请小心。

更新 - 回应评论

实际上,在流本身上进行“窥视”是不可能的;它只是一个流,一旦接收到字节,它就不再在流中。一些流支持寻址,因此您可以从技术上重新读取该字节,但NetworkStream不支持。

仅当将流读入缓冲区时,才适用于窥视; 一旦数据在缓冲区中,则轻松进行窥视,因为您只需检查缓冲区中当前位置的内容。这就是为什么StreamReader能够做到这一点的原因; 通常没有Stream类会有自己的Peek方法。

现在,对于这个特定的问题,我质疑是否这真的是正确的答案。我理解动态选择处理流的方法的想法,但你是否真的需要在原始流上这样做?难道你不能先将流读入字节数组,甚至将其复制到MemoryStream中,然后从那一点开始处理吗?

我看到的主要问题是,如果你在读取网络流时发生了什么不好的事情,数据就丢失了。但是,如果你先将其读入临时位置,就可以进行调试。你可以找出数据是什么以及为什么正在尝试处理数据的对象在中途失败。
一般来说,你想要做的第一件事情是将 NetworkStream 读入本地缓冲区。我能想到不这样做的唯一理由是如果你正在读取大量数据 - 即使是这样,我也可能考虑使用文件系统作为中间缓冲区,如果它不适合内存。
我不知道你的确切要求,但从我目前所学到的知识来看,我的建议是:除非有充分的理由这样做,否则不要直接从 NetworkStream 处理数据。考虑先将数据读入内存或磁盘,然后再处理副本。

1
我确实需要检索字节。但是我不希望它从流中被移除,因为该字节包含有关决定使用哪种方法/类进一步处理流的信息。 - Lopper
1
获取字节,决定使用哪种方法/类,然后使用它。您是否需要在决定后仍将字节保留在流中?如果是这样:协议有误 - 为两个目的使用相同的字节。 - John Saunders
1
@John Saunders:我完全同意,但是我曾经处理过这种设备(其协议我无法控制),它们确实会这样做。 前面的几个字节是一个幻数和内容长度,这些又需要用来计算出现在末尾的校验和。所以即使这是一个糟糕的协议,但不幸的是,它也是一个真实存在的协议。 - Aaronaught
那不是问题 - 我以前处理过那个。问题在于消息类型使用了半个字节,另一半是消息的一部分。在这种情况下,你必须窥视。但魔术数字和长度可以被读取并传递给工厂方法,还有流(包含“长度”)更多的字节。 - John Saunders
@John Saunders:确实可以,这也是我建议先将其读入本地缓冲区的原因。另一个选择是仅将标题读入缓冲区,然后将其传递给工厂,我想这就是你所暗示的。我认为我仍然更喜欢前者;直接在“NetworkStream”上进行任何处理可能会非常难以调试... - Aaronaught

5
如果您可以访问Socket对象,可以尝试使用Receive方法,并传递SocketFlags.Peek。这类似于在BSD Sockets或Winsock中传递到recv调用的MSG_PEEK标志。

0

FWIW,这里是一个可窥探流的非可寻址流,针对仅提前一个字节进行了优化:

public class OneBytePeekableStream : Stream
{
    private readonly bool _disposeStreamOnDispose;
    private readonly Stream _stream;
    private int _buffer; // byte or -1
    private int _bufferLength; // 0 or 1

    public OneBytePeekableStream(Stream stream, bool disposeStreamOnDispose)
    {
        if (stream == null)
            throw new ArgumentNullException(nameof(stream));
            
        _stream = stream;
        _disposeStreamOnDispose = disposeStreamOnDispose;
    }

    public override long Length => _stream.Length;
    public override bool CanRead => _stream.CanRead;
    public override bool CanSeek => _stream.CanSeek;
    public override bool CanWrite => _stream.CanWrite;
    public override bool CanTimeout => _stream.CanTimeout;
    public override int ReadTimeout { get => _stream.ReadTimeout; set => _stream.ReadTimeout = value; }
    public override int WriteTimeout { get => _stream.WriteTimeout; set => _stream.WriteTimeout = value; }
    public override long Position { get => _stream.Position - _bufferLength; set { _stream.Position = value; _bufferLength = 0; } }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (buffer == null)
            throw new ArgumentNullException(nameof(buffer));

        if (offset < 0)
            throw new ArgumentOutOfRangeException(nameof(offset));

        if (count < 0)
            throw new ArgumentOutOfRangeException(nameof(count));

        if (buffer.Length - offset < count)
            throw new ArgumentOutOfRangeException(nameof(count));

        if (count == 0)
            return 0;

        if (_bufferLength == 0)
            return _stream.Read(buffer, offset, count);

        if (_buffer < 0)
            return 0;

        _bufferLength = 0;
        buffer[offset] = (byte)_buffer;
        if (count == 1)
            return count;

        var read = _stream.Read(buffer, offset + 1, count - 1);
        return read + 1;
    }

    // this is the sole reason of this class
    // returns -1 is stream is EOF
    public virtual int PeekByte()
    {
        if (_bufferLength > 0)
            return _buffer;

        _buffer = _stream.ReadByte();
        _bufferLength = 1;
        return _buffer;
    }

    public override int ReadByte()
    {
        if (_bufferLength == 0)
            return _stream.ReadByte();

        if (_buffer < 0)
            return -1;

        _bufferLength = 0;
        return _buffer;
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        var ret = _stream.Seek(offset, origin);
        _bufferLength = 0;
        return ret;
    }

    public override void Flush() => _stream.Flush();
    public override void SetLength(long value) => _stream.SetLength(value);
    public override void WriteByte(byte value) => _stream.WriteByte(value);
    public override void Write(byte[] buffer, int offset, int count) => _stream.Write(buffer, offset, count);

    protected override void Dispose(bool disposing)
    {
        if (disposing)
        {
            if (_disposeStreamOnDispose)
            {
                _stream.Dispose();
            }
        }

        base.Dispose(disposing);
    }
}

0
这是一个非常简单的PeekStream实现,它允许您仅在流的开头查看一定数量的字节(而不是随时都能查看)。查看的字节本身作为Stream返回,以最小化对现有代码的更改。
以下是使用方法:
Stream nonSeekableStream = ...;
PeekStream peekStream = new PeekStream(nonSeekableStream, 30); // Peek max 30 bytes
Stream initialBytesStream = peekStream.GetInitialBytesStream();
ParseHeaders(initialBytesStream);  // Work on initial bytes of nonSeekableStream
peekStream.Read(...) // Read normally, the read will start from the beginning

GetInitialBytesStream() 返回一个可寻址的流,其中包含底层流的最多 peekSize 个初始字节(如果流短于 peekSize,则少于该值)。

由于其简单性,读取 PeekStream 应该只比直接读取底层流略慢(如果有的话)。

public class PeekStream : Stream
{
    private Stream m_stream;
    private byte[] m_buffer;
    private int m_start;
    private int m_end;

    public PeekStream(Stream stream, int peekSize)
    {
        if (stream == null)
        {
            throw new ArgumentNullException("stream");
        }
        if (!stream.CanRead)
        {
            throw new ArgumentException("Stream is not readable.");
        }
        if (peekSize < 0)
        {
            throw new ArgumentOutOfRangeException("peekSize");
        }
        m_stream = stream;
        m_buffer = new byte[peekSize];
        m_end = stream.Read(m_buffer, 0, peekSize);
    }

    public override bool CanRead
    {
        get
        {
            return true;
        }
    }

    public override bool CanWrite
    {
        get
        {
            return false;
        }
    }

    public override bool CanSeek
    {
        get
        {
            return false;
        }
    }

    public override long Length
    {
        get
        {
            throw new NotSupportedException();
        }
    }

    public override long Position
    {
        get
        {
            throw new NotSupportedException();
        }
        set
        {
            throw new NotSupportedException();
        }
    }

    public MemoryStream GetInitialBytesStream()
    {
        return new MemoryStream(m_buffer, 0, m_end, false);
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotSupportedException();
    }

    public override void SetLength(long value)
    {
        throw new NotSupportedException();
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        // Validate arguments
        if (buffer == null)
        {
            throw new ArgumentNullException("buffer");
        }
        if (offset < 0)
        {
            throw new ArgumentOutOfRangeException("offset");
        }
        if (offset + count > buffer.Length)
        {
            throw new ArgumentOutOfRangeException("count");
        }

        int totalRead = 0;

        // Read from buffer
        if (m_start < m_end)
        {
            int toRead = Math.Min(m_end - m_start, count);
            Array.Copy(m_buffer, m_start, buffer, offset, toRead);
            m_start += toRead;
            offset += toRead;
            count -= toRead;
            totalRead += toRead;
        }

        // Read from stream
        if (count > 0)
        {
            totalRead += m_stream.Read(buffer, offset, count);
        }

        // Return total bytes read
        return totalRead;
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        throw new NotImplementedException();
    }

    public override int ReadByte()
    {
        if (m_start < m_end)
        {
            return m_buffer[m_start++];
        }
        else
        {
            return m_stream.ReadByte();
        }
    }

    public override void Flush()
    {
        m_stream.Flush();
    }

    protected override void Dispose(bool disposing)
    {
        if (disposing)
        {
            m_stream.Dispose();
        }
        base.Dispose(disposing);
    }
}

免责声明:上述的PeekStream是从一个正在运行的程序中提取出来的,但它并没有经过全面测试,所以可能存在一些错误。它对我来说是有效的,但你可能会发现一些它无法处理的特殊情况。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接