具有双向通信的TCP服务器

4
我曾尝试创建一个TCP服务以实现我所需求的,但不幸的是我卡在了最后一步。
场景如下:有一个服务器实例,连接着10个客户端。当一个客户端发送一个命令并接收到响应时,一切都运行正常。然而,最后一种情况却没有成功。
当一个客户端发出“UPDATE”命令时,服务器应向所有已连接的客户端发送一条消息,告知他们需要做些什么。
通信示例:
1
Client A          GetTime ----->          Server
Client A          <----- Time is...       Server 

2
Client A          UPDATE ------>          Server
Client A          <------- Ack            Server
Client A          <------- DoUpdate       Server
Client B          <------- DoUpdate       Server
Client C          <------- DoUpdate       Server

Comms 1很好地运作,主要是因为调用发送和接收的函数,但是对于Comms 2,我无法想出如何实现这一点,至少不需要打开第二个端口进行通信,这并不理想。


基于微软文章的当前尝试

服务器

class Program
{
    public static int Main(String[] args)
    {
        AsynchronousSocketListener.StartListening();
        return 0;
    }
}
public class StateObject
{
    // Client  socket.
    public Socket WorkSocket = null;
    // Size of receive buffer.
    public const int BufferSize = 1024;
    // Receive buffer.
    public byte[] Buffer = new byte[BufferSize];
    // Received data string.
    public StringBuilder Sb = new StringBuilder();
}

public class AsynchronousSocketListener
{
    // Thread signal.
    public static ManualResetEvent AllDone = new ManualResetEvent(false);

    public static void StartListening()
    {
        // Data buffer for incoming data.
        //var bytes = new Byte[1024];

        // Establish the local endpoint for the socket.
        // The DNS name of the computer
        // running the listener is "host.contoso.com".
    //??IPHostEntry ipHostInfo = Dns.Resolve(Dns.GetHostName());
    //??IPAddress ipAddress = ipHostInfo.AddressList[0];
    //??IPEndPoint localEndPoint = new IPEndPoint(ipAddress, 3030);

    // Create a TCP/IP socket.
    var listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

    // Bind the socket to the local endpoint and listen for incoming connections.
    try
    {
        listener.Bind(new IPEndPoint(IPAddress.Any, 3030));
        //listener.Bind(localEndPoint);
        listener.Listen(100);

        while (true)
        {
            // Set the event to nonsignaled state.
            AllDone.Reset();

            // Start an asynchronous socket to listen for connections.
            Console.WriteLine("Waiting for a connection...");
            listener.BeginAccept((AcceptCallback), listener);

            // Wait until a connection is made before continuing.
            AllDone.WaitOne();
        }

    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }

    Console.WriteLine("\nPress ENTER to continue...");
    Console.Read();

}

public static void AcceptCallback(IAsyncResult ar)
{
    // Signal the main thread to continue.
    AllDone.Set();

    // Get the socket that handles the client request.
    var listener = (Socket)ar.AsyncState;
    Socket handler = listener.EndAccept(ar);

    // Create the state object.
    var state = new StateObject {WorkSocket = handler};
    handler.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, ReadCallback, state);
}

public static void ReadCallback(IAsyncResult ar)
{
    // Retrieve the state object and the handler socket
    // from the asynchronous state object.
    var state = (StateObject)ar.AsyncState;
    Socket handler = state.WorkSocket;

    // Read data from the client socket. 
    int bytesRead = handler.EndReceive(ar);

    if (bytesRead > 0)
    {
        // There  might be more data, so store the data received so far.
        state.Sb.Append(Encoding.ASCII.GetString(
            state.Buffer, 0, bytesRead));

        // Check for end-of-file tag. If it is not there, read 
        // more data.
        var content = state.Sb.ToString();
        if (content.IndexOf("<EOF>", StringComparison.Ordinal) > -1)
        {
            // All the data has been read from the 
            // client. Display it on the console.
            Console.WriteLine("Read {0} bytes from socket. \n Data : {1}",
                content.Length, content);
            // Echo the data back to the client.
            Send(handler, content);
        }
        else
        {
            // Not all data received. Get more.
            handler.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, ReadCallback, state);
        }
    }
}

private static void Send(Socket handler, String data)
{
    // Convert the string data to byte data using ASCII encoding.
    var byteData = Encoding.ASCII.GetBytes(data);

    // Begin sending the data to the remote device.
    handler.BeginSend(byteData, 0, byteData.Length, 0, SendCallback, handler);
}

private static void SendCallback(IAsyncResult ar)
{
    try
    {
        // Retrieve the socket from the state object.
        var handler = (Socket)ar.AsyncState;

        // Complete sending the data to the remote device.
        int bytesSent = handler.EndSend(ar);
        Console.WriteLine("Sent {0} bytes to client.", bytesSent);

        handler.Shutdown(SocketShutdown.Both);
        handler.Close();

    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

/*
public static int Main(String[] args)
{
    StartListening();
    return 0;
}
 * */
}

客户端代码
class Program
{
    public static int Main(String[] args)
    //static void Main(string[] args)
    {
        Console.Title = "Client ";
        AsynchronousClient.StartClient();
        Console.ReadLine();
        return 0;
    }
}

public class StateObject
{
// Client socket.
public Socket WorkSocket = null;
// Size of receive buffer.
public const int BufferSize = 256;
// Receive buffer.
public byte[] Buffer = new byte[BufferSize];
// Received data string.
public StringBuilder Sb = new StringBuilder();
}

public class AsynchronousClient
{
// The port number for the remote device.
private const int Port = 3030;

// ManualResetEvent instances signal completion.
private static readonly ManualResetEvent ConnectDone =
    new ManualResetEvent(false);
private static readonly ManualResetEvent SendDone =
    new ManualResetEvent(false);
private static readonly ManualResetEvent ReceiveDone =
    new ManualResetEvent(false);

// The response from the remote device.
private static String _response = String.Empty;

public static void StartClient()
{
    // Connect to a remote device.
    try
    {
        // Establish the remote endpoint for the socket.
        // The name of the 
        // remote device is "host.contoso.com".
        //IPHostEntry ipHostInfo = Dns.Resolve("host.contoso.com");
        //??IPHostEntry ipHostInfo = Dns.Resolve("localhost");
        //??IPAddress ipAddress = ipHostInfo.AddressList[0];
        //??IPEndPoint remoteEP = new IPEndPoint(ipAddress, port);

        // Create a TCP/IP socket.
        var client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

        // Connect to the remote endpoint.

        //client.BeginConnect(remoteEP,
            //new AsyncCallback(ConnectCallback), client);
        var remoteEP = new IPEndPoint(IPAddress.Parse("127.0.0.1"), Port);
        client.BeginConnect(remoteEP, ConnectCallback, client);
        ConnectDone.WaitOne();

        // set receive to another thread so we can constantly receive, doesn't work as intended
        //var thread = new Thread(() => ReadThread(client));
        //thread.Start();

        // Send test data to the remote device.
        Send(client, "This is a test<EOF>");
        SendDone.WaitOne();

        //test remove
        // Receive the response from the remote device.
        Receive(client);
        ReceiveDone.WaitOne();

        // Write the response to the console.
        Console.WriteLine("Response received : {0}", _response);

        // Release the socket.
        //client.Shutdown(SocketShutdown.Both);
        //client.Close();

    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

// doesn't work as expected
private static void ReadThread(object ar)
{
    var client = (Socket)ar;
    while (true)
    {
        Receive(client);
        ReceiveDone.WaitOne();

        // Write the response to the console.
        Console.WriteLine("Response received : {0}", _response);
    }
}

private static void ConnectCallback(IAsyncResult ar)
{
    try
    {
        // Retrieve the socket from the state object.
        var client = (Socket)ar.AsyncState;

        // Complete the connection.
        client.EndConnect(ar);

        Console.WriteLine("Socket connected to {0}", client.RemoteEndPoint);

        // Signal that the connection has been made.
        ConnectDone.Set();
    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

private static void Receive(Socket client)
{
    try
    {
        // Create the state object.
        var state = new StateObject {WorkSocket = client};

        // Begin receiving the data from the remote device.
        client.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, ReceiveCallback, state);
    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

private static void ReceiveCallback(IAsyncResult ar)
{
    try
    {
        // Retrieve the state object and the client socket 
        // from the asynchronous state object.
        var state = (StateObject)ar.AsyncState;
        Socket client = state.WorkSocket;

        // Read data from the remote device.
        int bytesRead = client.EndReceive(ar);

        if (bytesRead > 0)
        {
            // There might be more data, so store the data received so far.
            state.Sb.Append(Encoding.ASCII.GetString(state.Buffer, 0, bytesRead));

            // Get the rest of the data.
            client.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0, ReceiveCallback, state);
        }
        else
        {
            // All the data has arrived; put it in response.
            if (state.Sb.Length > 1)
            {
                _response = state.Sb.ToString();
            }
            // Signal that all bytes have been received.
            ReceiveDone.Set();
        }
    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

private static void Send(Socket client, String data)
{
    // Convert the string data to byte data using ASCII encoding.
    var byteData = Encoding.ASCII.GetBytes(data);

    // Begin sending the data to the remote device.
    client.BeginSend(byteData, 0, byteData.Length, 0, SendCallback, client);
}

private static void SendCallback(IAsyncResult ar)
{
    try
    {
        // Retrieve the socket from the state object.
        var client = (Socket)ar.AsyncState;

        // Complete sending the data to the remote device.
        int bytesSent = client.EndSend(ar);
        Console.WriteLine("Sent {0} bytes to server.", bytesSent);

        // Signal that all bytes have been sent.
        SendDone.Set();
    }
    catch (Exception e)
    {
        Console.WriteLine(e.ToString());
    }
}

/*
public static int Main(String[] args)
{
    StartClient();
    return 0;
}
*/
}

之前有效的系统

服务器:

class Program
{

    private static byte[] buffer = new byte[1024];
    public static Socket _serverSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    public static List<Socket> clientSockets = new List<Socket>();

    static void Main(string[] args)
    {
        Console.Title = "Server, " + clientSockets.Count + " clients are connected";
        SetupServer();
        Console.ReadLine();
    }
    public static void SetupServer()
    {
        Console.WriteLine("Setting up server...");
        _serverSocket.Bind(new IPEndPoint(IPAddress.Any, 3030));
        _serverSocket.Listen(10);
        _serverSocket.BeginAccept(AcceptCallback, null);
        Console.ReadLine();// stops cmd from closing
    }

    public static void AcceptCallback(IAsyncResult AR)
    {
        Socket socket = _serverSocket.EndAccept(AR);
        if (!clientSockets.Contains(socket))
            clientSockets.Add(socket);
        IPEndPoint remoteIPEndPoint = socket.RemoteEndPoint as IPEndPoint;

        Console.WriteLine(remoteIPEndPoint.Address);

        Console.WriteLine("Client Connected");
        Console.Title = "Server, " + clientSockets.Count + " clients are connected";
        socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, RecieveCallBack, socket);
        _serverSocket.BeginAccept(AcceptCallback, null);
    }

    private static void RecieveCallBack(IAsyncResult AR)
    {
        var socket = (Socket)AR.AsyncState;
        int received = socket.EndReceive(AR);
        var databuff = new byte[received];
        Array.Copy(buffer, databuff, received);

        string s = Encoding.ASCII.GetString(databuff);
        Console.WriteLine("Text Received: " + s);
        string response = string.Empty;

        switch (s.ToLower())
        {
            case "get time":
                response = DateTime.Now.ToLongTimeString();
                break;
            case "hello":
                response = "olleh";
                break;
            case "update clients":
                response = "";
                SendData("Ack", socket);
                doUpdateClients();
                break;
            default:
                response = "Invavlid Request";
                break;
        }

        SendData(response, socket);
    }

    private static void SendData(string Data, Socket socket)
    {
        byte[] data = Encoding.ASCII.GetBytes(Data);
        socket.BeginSend(data, 0, data.Length, SocketFlags.None, sendCallback, socket);
        socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, RecieveCallBack, socket);
    }

    private static void doUpdateClients()
    {
        // need to send an update message to all the clients
        var upd = new Thread((UpdateClients));
        upd.Start();
    }

    private static void UpdateClients()
    {
        Thread.Sleep(5000);
        foreach (var sock in clientSockets)
        {
            SendData("UpdateClients", sock);
        }
    }
    private static void sendCallback(IAsyncResult AR)
    {
        var socket = (Socket)AR.AsyncState;
        socket.EndSend(AR);
    }
    //
}

}

客户端:

class Program
{

    private static byte[] buffer = new byte[1024];
    public static Socket _clientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

    static void Main(string[] args)
    {
        Console.Title = "Client ";
        LoopConnect();
        //ReceiveLoopStart();
        //_clientSocket.Listen(10);
        SendLoop();
        Console.ReadLine();
    }

    private static void LoopConnect()
    {
        while (!_clientSocket.Connected)
        {
            try
            {
                _clientSocket.Connect(IPAddress.Parse("127.0.0.1"), 3030);
            }
            catch (SocketException se)
            {

            }
        }
        Console.WriteLine("Connected");
    }
    private static void ReceiveLoopStart()
    {
        //_clientSocket.Bind(new IPEndPoint(IPAddress.Any, 3030));
        //_clientSocket.Listen(10);
        _clientSocket.BeginAccept(AcceptCallback, null);

        Thread receiveThread = new Thread(ReceiveLoop);
        receiveThread.Start();
    }

    private static void ReceiveLoop()
    {
        _clientSocket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, RecieveCallBack, _clientSocket);
        _clientSocket.BeginAccept(AcceptCallback, null);
    }

    private static void RecieveCallBack(IAsyncResult AR)
    {
        int received = _clientSocket.EndReceive(AR);
        var databuff = new byte[received];
        Array.Copy(buffer, databuff, received);

        string s = Encoding.ASCII.GetString(databuff);
        Console.WriteLine("Text Received: " + s);
        string response = string.Empty;

        switch (s.ToLower())
        {
            case "get time":
                response = DateTime.Now.ToLongTimeString();
                break;
            case "hello":
                response = "olleh";
                break;
            default:
                response = "Invavlid Request";
                break;
        }

    }
    public static void AcceptCallback(IAsyncResult AR)
    {
        Socket socket = _clientSocket.EndAccept(AR);

        Console.WriteLine("Client Connected");
        socket.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, RecieveCallBack, socket);
        _clientSocket.BeginAccept(AcceptCallback, null);
    }



    private static void SendLoop()
    {
        while (true)
        {
            Console.Write("Enter Request: ");
            string req = Console.ReadLine();

            var buffer = Encoding.ASCII.GetBytes(req);
            _clientSocket.Send(buffer);

            var tempBuff = new byte[1024];
            int rec = _clientSocket.Receive(tempBuff);

            var data = new byte[rec];
            Array.Copy(tempBuff, data, rec);

            Console.WriteLine("Received: " + Encoding.ASCII.GetString(data));
        }
    }


}
}

你不需要在客户端上启动一个监听线程,因为你是从服务器发送更新,但我看不到你的客户端正在监听。只有在客户端发送消息时,它才会进行监听。 - Oddmar Dam
我尝试了几种服务器和客户端代码的方法,我不想把它们全部发布出来,因为这可能会引起混淆,但我会为每个方法发布尝试的代码。 - Neo
@Neo,你的设计有问题,因为所有线程都被阻塞在读取操作中了。就是这么简单。你需要在同一个线程中处理所有客户端,并使用select函数。 - Philip Stuyck
通常情况下,每个连接都不会影响其他连接。如果是这种情况,你就不需要使用select。顺便提一句,线程也已经过时了,因为现在有async和await。(参考Steven Cleary的书《C# Cookbook中的并发》) - Philip Stuyck
线程并不过时。在简单性和生产力方面,它们胜出。能够暂停调试器并查看应用程序正在执行的操作非常有价值。异步IO适用于GUI应用程序、并发和大量节省内存的情况。 - usr
显示剩余8条评论
2个回答

3
我这里没有C#的例子,但你需要学习使用select api。
你只需要一个线程来完成这个任务。你可以使用同一个线程来处理所有正在使用的socket。
如果没有人连接,你只有监听socket。然后你只需要使用select api来监视该socket上发生的情况。如果没有指定超时,select将会阻塞。如果有数据可用,那么就意味着你可以调用accept。accept的结果是另一个socket。现在你在select中使用了2个socket。再次执行select将一直阻塞,直到其中一个socket有数据可用。也许是监听socket,所以在调用accept之后你得到了另一个socket。现在你在select中使用了3个socket。假设现在其中一个accept socket有可用的数据,你将通过正确使用select api看到它。然后你就可以使用任何一个socket来发送数据,除了监听socket,因为它不是用于发送数据的。
更多信息可以在这里找到:http://www.codeproject.com/Articles/20066/A-scalable-client-server-using-select-socket-funct
它使用了我解释的内容,并提供了更详细的说明。

谢谢,我花了一段时间在谷歌上搜索,但是没有找到我需要的东西。 - Neo

1

我不想涉及你的设计问题,但是看起来你甚至不能引用其他客户端。 为什么不这样保存socket的集合:

 private List<Socket> _handlers = new List<Socket>();
 public static void AcceptCallback(IAsyncResult ar)
 {   
     Socket handler = listener.EndAccept(ar);    
     var state = new StateObject {WorkSocket = handler};
     handlers.Add(handler);
     handler.BeginReceive(state.Buffer, 0, StateObject.BufferSize, 0,   ReadCallback, state);
 }

根据消息类型或其他条件,您需要通知这些客户端(所有客户端,而不仅仅是当前传递给ReceiveCallback的客户端)。
 public static void ReadCallback(IAsyncResult ar)
 {
      if("<EOF>")
      {
         foreach(var h in _handlers)
         {
             Send(h,data);
         } 
      } 
 }

请检查“之前正常工作的系统”下添加的附加代码。 - Neo
1
这是很多代码啊。我刚刚看到了,你之前的系统确实可以工作。除非在BeginReceive上设置超时并让循环再次迭代每个客户端,否则你怎么能向他们发信号呢?这是一种池化机制。 - eran otzap

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接