C# Socket.BeginReceive/EndReceive

11
Socket.BeginReceive/EndReceive函数的调用顺序是怎样的?
例如,我调用了两次BeginReceive,一次获取消息长度,第二次获取消息本身。现在的场景是这样的,对于我发送的每个消息,我都开始等待其完成(实际上是消息发送的确认,同时在接收到确认后,我会等待操作完成),所以我会在每个BeginSend中调用BeginReceive,但在每个BeginReceive的回调中,我会检查我是否正在接收长度或消息。如果我接收的是消息并且已经完全接收到,那么我会调用另一个BeginReceive来接收操作的完成情况。现在问题就出在这里了。因为我的其中一个接收回调正在接收字节,并将其解释为消息的长度,而实际上它是消息本身。
现在该如何解决呢?
编辑:这是一个C#.NET的问题 :)
以下是代码,基本上太长了,抱歉
public void Send(string message)
{
    try
    {
        bytesSent = 0;

        writeDataBuffer = System.Text.Encoding.ASCII.GetBytes(message);
        writeDataBuffer = WrapMessage(writeDataBuffer);
        messageSendSize = writeDataBuffer.Length;

        clientSocket.BeginSend(writeDataBuffer, bytesSent, messageSendSize, SocketFlags.None,
                            new AsyncCallback(SendComplete), clientSocket);
    }
    catch (SocketException socketException)
    {
        MessageBox.Show(socketException.Message);
    }
}

public void WaitForData()
{
    try
    {
        if (!messageLengthReceived)
        {
            clientSocket.BeginReceive(receiveDataBuffer, bytesReceived, MESSAGE_LENGTH_SIZE - bytesReceived,
                                    SocketFlags.None, new AsyncCallback(RecieveComplete), clientSocket);
        }
}

public void Send(string message)
{
    try
    {
        bytesSent = 0;

        writeDataBuffer = System.Text.Encoding.ASCII.GetBytes(message);
        writeDataBuffer = WrapMessage(writeDataBuffer);
        messageSendSize = writeDataBuffer.Length;

        clientSocket.BeginSend(writeDataBuffer, bytesSent, messageSendSize, SocketFlags.None,
                            new AsyncCallback(SendComplete), clientSocket);
    }
    catch (SocketException socketException)
    {
        MessageBox.Show(socketException.Message);
    }
}

public void WaitForData()
{
    try
    {
        if (! messageLengthReceived)
        {
            clientSocket.BeginReceive(receiveDataBuffer, bytesReceived, MESSAGE_LENGTH_SIZE - bytesReceived,
                                    SocketFlags.None, new AsyncCallback(RecieveComplete), clientSocket);
        }
        else 
        {
            clientSocket.BeginReceive(receiveDataBuffer, bytesReceived, messageLength - bytesReceived,
                                    SocketFlags.None, new AsyncCallback(RecieveComplete), clientSocket);
        }
    }
    catch (SocketException socketException)
    {
        MessageBox.Show(socketException.Message);
    }
}

public void RecieveComplete(IAsyncResult result)
{
    try
    {
        Socket socket = result.AsyncState as Socket;
        bytesReceived = socket.EndReceive(result);

        if (! messageLengthReceived)
        {
            if (bytesReceived != MESSAGE_LENGTH_SIZE)
            {
                WaitForData();
                return;
            }

            // unwrap message length
            int length = BitConverter.ToInt32(receiveDataBuffer, 0);
            length = IPAddress.NetworkToHostOrder(length);

            messageLength = length;
            messageLengthReceived = true;

            bytesReceived = 0;

            // now wait for getting the message itself
            WaitForData();
        }
        else
        {
            if (bytesReceived != messageLength)
            {
                WaitForData();
            }
            else
            {
                string message = Encoding.ASCII.GetString(receiveDataBuffer);

                MessageBox.Show(message);

                bytesReceived = 0;
                messageLengthReceived = false;

                // clear buffer
                receiveDataBuffer = new byte[AsyncClient.BUFFER_SIZE];

                WaitForData();
            }
        }
    }
    catch (SocketException socketException)
    {
        MessageBox.Show(socketException.Message);
    }

}

public void SendComplete(IAsyncResult result)
{
    try
    {
        Socket socket = result.AsyncState as Socket;
        bytesSent = socket.EndSend(result);

        if (bytesSent != messageSendSize)
        {
            messageSendSize -= bytesSent;

            socket.BeginSend(writeDataBuffer, bytesSent, messageSendSize, SocketFlags.None,
                            new AsyncCallback(SendComplete), clientSocket);
            return;
        }

        // wait for data
        messageLengthReceived = false;
        bytesReceived = 0;

        WaitForData();
    }
    catch (SocketException socketException)
    {
        MessageBox.Show(socketException.Message);
    }
}

2
你能添加代码/数据示例吗? - Zenuka
主要问题似乎是(MESSAGE_LENGTH_SIZE - bytesReceived),这在第一次使用时可以正常工作,但之后将不再是正确的消息长度大小,因为bytesReceived不再为0。当存在状态对象时,使用全局变量传递此类内容有点丑陋。 - Timothy Pratley
问题不在于这个,我已经调试了很多次,问题出在我调用了三次BeginReceive,两次用于接收长度,一次用于接收消息,但它们的调用顺序是先调用接收长度的回调,再调用消息的回调,导致消息被打断。所以我只需要知道回调的调用顺序。 - akif
你能解决这个问题吗?我也遇到了同样的问题。自你提问以来已经过了很长时间了,我猜你一定找到了解决办法。能否给我一些帮助?谢谢。 - sawyer
5个回答

22

按时间顺序应为:

  1. 使用BeginReceive接收消息长度。
  2. 使用EndReceive完成#1的接收。
  3. 使用BeginReceive接收消息正文。
  4. 使用EndReceive完成#3的接收。

例如,如果不使用回调,您可以执行以下操作:

var sync = socket.BeginReceive(....);
sync.AsyncWaitHandle.WaitOne();
var res = socket.EndReceive(sync);
sync = socket.BeginReceive(....);
sync.AsyncWaitHandle.WaitOne();
var res2 = socket.EndReceive(sync);

但是,你最好只使用Receive

我认为你可能会发现对于这两个不同的receive,使用单独的处理程序会更容易:

... Start(....) {
    sync = socket.BeginReceive(.... MessageLengthReceived, null);
}

private void MessageLengthReceived(IAsyncResult sync) {
  var len = socket.EndReceive(sync);
  // ... set up buffer etc. for message receive

 sync = socket.BeginReceive(... MessageReceived, null);
}

private void MessageReceived(IAsyncResult sync) {
  var len = socket.EndReceive(sync);
  // ... process message
}

最终,将所有相关内容放入状态对象中并通过 BeginReceive 传递(在完成委托访问时通过 IAsyncResult.AsyncState)可以使事情变得更加容易,但需要从命令式代码的线性思维转变为完全拥抱事件驱动方法。
2012补充:

.NET 4.5版本

使用C#5中的异步支持有一个新选项。这会使用编译器从内联代码生成手动延续(单独的回调方法)和闭包(状态)。不过,还有两件事要解决:
  1. System.Net.Sockets.Socket虽然具有各种“…Async”方法,但这些方法是基于事件的异步模式,而不是C#5的基于任务的模式,即使用TaskFactory.FromAsyncBegin...End...对获取单个Task<T>
  2. TaskFactory.FromAsync只支持传递三个附加参数(除了回调和状态)到Begin...。 解决方案:lambda不需要附加参数,具有正确的签名,C#将为我们提供正确的闭包以传递参数。
因此,使用Message作为另一种类型可以更加充分地实现,该类型处理将长度编码在一些固定字节数中的初始发送,然后将内容字节转换为内容缓冲区的长度。
private async Task<Message> ReceiveAMessage() {
  var prefix = new byte[Message.PrefixLength];

  var revcLen = await Task.Factory.FromAsync(
                         (cb, s) => clientSocket.BeginReceive(prefix, 0, prefix.Length, SocketFlags.None, cb, s),
                         ias => clientSocket.EndReceive(ias),
                         null);
  if (revcLen != prefix.Length) { throw new ApplicationException("Failed to receive prefix"); }
  
  int contentLength = Message.GetLengthFromPrefix(prefix);
  var content = new byte[contentLength];
  
  revcLen = await Task.Factory.FromAsync(
                         (cb, s) => clientSocket.BeginReceive(content, 0, content.Length, SocketFlags.None, cb, s),
                         ias => clientSocket.EndReceive(ias),
                         null);
  if (revcLen != content.Length) { throw new ApplicationException("Failed to receive content"); }
  
  return new Message(content);
}

你发现我做错了什么吗?:( 你能告诉我如何改进我的当前代码吗? - akif
@Manzoo:并不是直接的,这就是为什么我建议采用更简单的方法来保持不同的操作(消息长度和消息体)。更简单的代码将更容易调试。 - Richard
@Richard:你能提供一个使用状态对象的例子吗? - akif
@Manzoo: 这需要比我现有的时间更多(尤其是要验证它)。基本上:将所有状态(缓冲区、套接字、状态标志)放入一个辅助类型中。从操作传递到操作,而不是使用全局状态。 - Richard
你的更新示例非常有趣,但是有点难以阅读。您能否考虑包括注释或补充一些描述?我很好奇这是否可能是我在此处提出的问题的答案:https://dev59.com/qnbZa4cB1Zd3GeqPBQ_r - Niels Brinch
1
@NielsBrinch 我怀疑这个回答:它展示了如何将分离的Begin/End方法转换为与任务并行库方法相匹配的内容,从而与C#5的异步和等待兼容。如果您对我所使用的API有理解,并看到代码上方的注意事项#2,我不确定我该评论什么。 - Richard

6
也许您想要做的是链接您的回调函数:

伪代码:



// read the first 2 bytes as message length
BeginReceive(msg,0,2,-,-,new AsyncCallback(LengthReceived),-)

LengthReceived(ar) {
  StateObject so = (StateObject) ar.AsyncState;
  Socket s = so.workSocket;
  int read = s.EndReceive(ar);
  msg_length = GetLengthFromBytes(so.buffer);
  BeginReceive(so.buffer,0,msg_length,-,-,new AsyncCallback(DataReceived),-)
}

DataReceived(ar) {
  StateObject so = (StateObject) ar.AsyncState;
  Socket s = so.workSocket;
  int read = s.EndReceive(ar);
  ProcessMessage(so.buffer);
  BeginReceive(so.buffer,0,2,-,-,new AsyncCallback(LengthReceived),-)
}

请参见: http://msdn.microsoft.com/en-us/library/system.asynccallback.aspx,了解正确的示例。


1

不,我希望它是异步的。谢谢。 - akif
2
Manzoor,你可以在另一个线程上执行同步I/O。这样更容易、更可靠。 - H H

1

如果您描述发送的消息的结构,那么它将会有所帮助。

只要您只有一个未完成的BeginReceive(),它就会完成并提供线上下一个可用数据字节。如果您同时拥有多个未完成,则所有结果都是不确定的,因为.NET不能保证完成的顺序。


0

正如其他人所说,不要在此处使用全局变量 - 使用套接字状态的类。例如:

public class StateObject
{
    public const int DEFAULT_SIZE = 1024;           //size of receive buffer

    public byte[] buffer = new byte[DEFAULT_SIZE];  //receive buffer
    public int dataSize = 0;                        //data size to be received
    public bool dataSizeReceived = false;           //received data size?
    public StringBuilder sb = new StringBuilder();  //received data String
    public int dataRecieved = 0;

    public Socket workSocket = null;                //client socket.
    public DateTime TimeStamp;                      //timestamp of data
} //end class StateObject

在尝试重新发送消息之前,您应该验证套接字......您可能会遇到套接字异常。
在ReceiveComplete的“if”块中的WaitForData调用之后,您应该有一个return;。
Timothy Pratley在上面说过,一个错误将出现在第二次bytesRecieved。每次您仅测量从该EndReceive接收的bytesReceived,然后将其与messageLength进行比较。您需要保留所有bytesReceived的总和。
而您最大的错误是,在第一次调用ReceiveComplete时,您考虑了消息可能(很可能)包含除消息大小以外的更多数据 - 它很可能还包含消息的一半。您需要剥离数据大小,然后还要将消息的其余部分存储在变量message中。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接