C#中有哪些常用的异步网络编程模式?

6
我最近写了一个简单粗糙的C#原型代理服务器,作为努力使Java Web应用程序与其他服务器上的遗留VB6应用程序通信的一部分。它非常简单:
代理服务器和客户端都使用相同的消息格式;在代码中,我使用一个ProxyMessage类来表示客户端的请求和服务器生成的响应:
public class ProxyMessage
{
   int Length; // message length (not including the length bytes themselves)
   string Body; // an XML string containing a request/response

   // writes this message instance in the proper network format to stream 
   // (helper for response messages)
   WriteToStream(Stream stream) { ... }
}

这些信息非常简单:正文的长度和消息正文本身。
我有一个独立的ProxyClient类,表示与客户端的连接。它处理代理和单个客户端之间的所有交互。
我想知道是否有设计模式或最佳实践可以简化异步套接字编程中与样板代码相关的问题?例如,您需要特别注意管理读取缓冲区,以避免意外丢失字节,并且需要跟踪当前消息处理状态的进度。在我的当前代码中,我在TcpClient.BeginRead的回调函数中完成所有这些工作,并使用一些实例变量来管理缓冲区的状态和当前消息处理状态。
下面是我传递给BeginRead的回调函数的代码,以及相关的实例变量用于上下文。代码似乎“按原样”运行良好,但我想知道是否可以重构一下使其更清晰一些(或者可能已经很清晰了?)。
private enum BufferStates 
{ 
    GetMessageLength, 
    GetMessageBody 
}
// The read buffer. Initially 4 bytes because we are initially
// waiting to receive the message length (a 32-bit int) from the client 
// on first connecting. By constraining the buffer length to exactly 4 bytes,
// we make the buffer management a bit simpler, because
// we don't have to worry about cases where the buffer might contain
// the message length plus a few bytes of the message body.
// Additional bytes will simply be buffered by the OS until we request them.
byte[] _buffer = new byte[4];

// A count of how many bytes read so far in a particular BufferState.
int _totalBytesRead = 0;

// The state of the our buffer processing. Initially, we want
// to read in the message length, as it's the first thing
// a client will send
BufferStates _bufferState = BufferStates.GetMessageLength;

// ...ADDITIONAL CODE OMITTED FOR BREVITY...

// This is called every time we receive data from
// the client.

private void ReadCallback(IAsyncResult ar)
{
    try
    {
        int bytesRead = _tcpClient.GetStream().EndRead(ar);

        if (bytesRead == 0)
        {
            // No more data/socket was closed.
            this.Dispose();
            return;
        }

        // The state passed to BeginRead is used to hold a ProxyMessage
        // instance that we use to build to up the message 
        // as it arrives.
        ProxyMessage message = (ProxyMessage)ar.AsyncState;

        if(message == null)
            message = new ProxyMessage();

        switch (_bufferState)
        {
            case BufferStates.GetMessageLength:

                _totalBytesRead += bytesRead;

                // if we have the message length (a 32-bit int)
                // read it in from the buffer, grow the buffer
                // to fit the incoming message, and change
                // state so that the next read will start appending
                // bytes to the message body

                if (_totalBytesRead == 4)
                {
                    int length = BitConverter.ToInt32(_buffer, 0);
                    message.Length = length;
                    _totalBytesRead = 0;
                    _buffer = new byte[message.Length];
                    _bufferState = BufferStates.GetMessageBody;
                }

                break;

            case BufferStates.GetMessageBody:

                string bodySegment = Encoding.ASCII.GetString(_buffer, _totalBytesRead, bytesRead);
                _totalBytesRead += bytesRead;

                message.Body += bodySegment;

                if (_totalBytesRead >= message.Length)
                {
                    // Got a complete message.
                    // Notify anyone interested.

                    // Pass a response ProxyMessage object to 
                    // with the event so that receivers of OnReceiveMessage
                    // can send a response back to the client after processing
                    // the request.
                    ProxyMessage response = new ProxyMessage();
                    OnReceiveMessage(this, new ProxyMessageEventArgs(message, response));
                    // Send the response to the client
                    response.WriteToStream(_tcpClient.GetStream());

                    // Re-initialize our state so that we're
                    // ready to receive additional requests...
                    message = new ProxyMessage();
                    _totalBytesRead = 0;
                    _buffer = new byte[4]; //message length is 32-bit int (4 bytes)
                    _bufferState = BufferStates.GetMessageLength;
                }

                break;
        }

        // Wait for more data...
        _tcpClient.GetStream().BeginRead(_buffer, 0, _buffer.Length, this.ReadCallback, message);
    }
    catch
    {
        // do nothing
    }

}

到目前为止,我唯一的真正想法是将与缓冲区相关的内容提取到单独的MessageBuffer类中,然后只需在读取回调函数中按顺序附加新字节即可。 MessageBuffer将会担心当前的BufferState等问题,并在接收完整消息时触发事件,ProxyClient随后可以将其传播到主代理服务器代码中,以便请求进行处理。

你不会有你为这个开发的东西的开源版本可用吗? - Maslow
4个回答

2
我曾经也遇到过类似的问题。这里是我的解决方案(根据你提供的示例进行了修改)。
我们创建一个包装器,围绕着 Stream(NetworkStream 的超类,它是 TcpClient 或其他类的超类)。它监视读取操作。当读取数据时,数据被缓存。当我们接收到长度指示器(4 个字节)时,我们检查是否已经有完整的消息(4 个字节 + 消息正文长度)。当我们有完整的消息时,我们触发 MessageReceived 事件并将消息正文传递给事件处理程序,然后从缓冲区中删除该消息。这种技术自动处理分段消息和多消息每个包的情况。
public class MessageStream : IMessageStream, IDisposable
{
    public MessageStream(Stream stream)
    {
        if(stream == null)
            throw new ArgumentNullException("stream", "Stream must not be null");

        if(!stream.CanWrite || !stream.CanRead)
            throw new ArgumentException("Stream must be readable and writable", "stream");

        this.Stream = stream;
        this.readBuffer = new byte[512];
        messageBuffer = new List<byte>();
        stream.BeginRead(readBuffer, 0, readBuffer.Length, new AsyncCallback(ReadCallback), null);
    }

    // These belong to the ReadCallback thread only.
    private byte[] readBuffer;
    private List<byte> messageBuffer;

    private void ReadCallback(IAsyncResult result)
    {
        int bytesRead = Stream.EndRead(result);
        messageBuffer.AddRange(readBuffer.Take(bytesRead));

        if(messageBuffer.Count >= 4)
        {
            int length = BitConverter.ToInt32(messageBuffer.Take(4).ToArray(), 0);  // 4 bytes per int32

            // Keep buffering until we get a full message.

            if(messageBuffer.Count >= length + 4)
            {
                messageBuffer.Skip(4);
                OnMessageReceived(new MessageEventArgs(messageBuffer.Take(length)));
                messageBuffer.Skip(length);
            }
        }

        // FIXME below is kinda hacky (I don't know the proper way of doing things...)

        // Don't bother reading again.  We don't have stream access.
        if(disposed)
            return;

        try
        {
            Stream.BeginRead(readBuffer, 0, readBuffer.Length, new AsyncCallback(ReadCallback), null);
        }
        catch(ObjectDisposedException)
        {
            // DO NOTHING
            // Ends read loop.
        }
    }

    public Stream Stream
    {
        get;
        private set;
    }

    public event EventHandler<MessageEventArgs> MessageReceived;

    protected virtual void OnMessageReceived(MessageEventArgs e)
    {
        var messageReceived = MessageReceived;

        if(messageReceived != null)
            messageReceived(this, e);
    }

    public virtual void SendMessage(Message message)
    {
        // Have fun ...
    }

    // Dispose stuff here
}

1
我喜欢这里的所有答案,所以给所有回答的人+1,但最终我选择了与这个答案非常相似的东西。它简单易懂,所以我几个月后还能记得我在做什么 ;) - Mike Spross

1

我认为你使用的设计很好,这大致上是我会做的事情。我认为通过重构成额外的类/结构体并不会带来太多收益,而且从我看到的情况来看,这样做实际上会使解决方案更加复杂。

我唯一的评论是关于两个读取操作的可靠性,其中第一个总是消息长度,第二个总是消息正文。我总是对这种方法持谨慎态度,因为如果由于意外情况(例如另一端发送了错误的长度)导致它们失去同步,那么恢复起来就非常困难。相反,我会使用一个大缓冲区进行单次读取,以便始终从网络中获取所有可用数据,然后检查缓冲区以提取完整的消息。这样,如果出现问题,当前缓冲区可以被丢弃以将事物恢复到干净状态,只有当前消息会丢失,而不是停止整个服务。

实际上,如果您的消息正文很大并分别在两个接收中到达,并且下一个消息在同一时间发送其长度和前一个正文的第二部分,那么您将遇到问题。如果发生这种情况,您的消息长度将附加到前一个消息的正文中,您将陷入前一段描述的情况。


嗯,我同意如果客户端发送错误的长度将会彻底搞砸代码的方式,我也在思考如何最好地处理它。我更倾向于如果客户端没有发送正确格式的消息,那就太糟糕了。垃圾进,垃圾出;-) - Mike Spross
你最后一段找到了一个好问题。我再看自己的代码,我想你说得对,两个消息有可能会重叠。讽刺的是,我之前为了避免这个问题而调整了缓冲区的大小,但却没想过如果一个大消息被网络分割会发生什么。糟糕了。 - Mike Spross
实际上,经过进一步思考,我认为我的方法仍然有效。在操作系统级别上,套接字缓冲区可能包含一个消息的结尾和下一个消息的开头;但是,BeginRead/EndRead保证只读取适合的字节数... - Mike Spross
在传递给BeginRead的缓冲区中,特别是,BeginRead不会读取超过(buffer.Length - current_offset_in_buffer)字节。如果有来自另一条消息的更多字节,则它们将保留在套接字缓冲区中,直到下一次BeginRead调用。除此之外... - Mike Spross
TCP/IP协议保证客户端不会发送超过接收套接字可以处理的字节数(“滑动窗口”),因此即使低级套接字缓冲区也不会溢出,除非TCP/IP堆栈存在错误。所以,我认为我没问题。不过你的第一个观点仍然是正确的。 - Mike Spross
真实情况是,你永远不能得到比缓冲区大小更多的字节,但通常你会得到更少的字节,这就是你会遇到问题的地方。如果你在一次读取中只得到了一半的消息,下一次读取仍然会有同样大小的缓冲区可用,并且每次读取都会给它完整的容量。 - sipsorcery

1

您可以使用 yield return 来自动生成异步回调状态机。Jeffrey Richter 通过他的 AsyncEnumerator 类推广了这种技术,我也在 这里 尝试过。


1

你的做法没有问题。但是对我来说,我喜欢将数据的接收与处理分开,这就是你提出的MessageBuffer类的思路。我在这里详细讨论了这个问题。


是的,这正是我所想的:希望将接收和实际消息处理逻辑分开。 - Mike Spross
我在问题中没有提到的是代理服务器最终将需要处理不同的协议(它基本上会在一个常见的代理消息格式中隧道传递消息),因此我想寻找一种根据被隧道协议分派不同逻辑的好方法。 - Mike Spross

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接