接受多个 TCP 客户端的最佳方式是什么?

8
我有一个客户端/服务器基础架构。目前,他们使用TcpClient和TcpListener在所有客户端和服务器之间发送和接收数据。
当前的做法是,当数据被接收(在它自己的线程上),它会被放入队列中以供另一个线程处理,以便释放套接字,使其准备好并且可以接收新数据。
                // Enter the listening loop.
                while (true)
                {
                    Debug.WriteLine("Waiting for a connection... ");

                    // Perform a blocking call to accept requests.
                    using (client = server.AcceptTcpClient())
                    {
                        data = new List<byte>();

                        // Get a stream object for reading and writing
                        using (NetworkStream stream = client.GetStream())
                        {
                            // Loop to receive all the data sent by the client.
                            int length;

                            while ((length = stream.Read(bytes, 0, bytes.Length)) != 0)
                            {
                                var copy = new byte[length];
                                Array.Copy(bytes, 0, copy, 0, length);
                                data.AddRange(copy);
                            }
                        }
                    }

                    receivedQueue.Add(data);
                }

然而,我想找出是否有更好的方法来解决这个问题。例如,如果有10个客户端都想同时向服务器发送数据,则只有一个会成功,而其他所有客户端都会失败。或者,如果一个客户端连接速度慢并且占用了套接字,所有其他通信都将停止。
难道没有一种方式可以同时从所有客户端接收数据,并在其完成下载后将接收到的数据添加到队列中进行处理吗?

不要介意我自己的博客:http://jonathan.dickinsons.co.za/blog/2011/02/net-sockets-and-you/ - 它简要地涉及了异步循环,并包含了一个真实的实现(你不应该像@Jalal建议的那样使用ThreadPool)。 - Jonathan Dickinson
4个回答

19
这里是一个起步答案,比我的博客文章更适合初学者。
.Net有一个异步模式,围绕着Begin*和End*调用。例如-BeginReceiveEndReceive。它们几乎总是有它们的非异步对应项(在本例中为Receive);并且实现了完全相同的目标。
最重要的是要记住,套接字方法不仅使调用变成异步 - 它们还公开了称为IOCP(IO完成端口,Linux / Mono也有这两个但我忘了名称)的东西,在服务器上使用非常重要; IOCP的关键是,当等待数据时,您的应用程序不会消耗线程。
如何使用Begin/End模式
每个Begin*方法将与其非异步对应项相比多出2个参数。第一个是AsyncCallback,第二个是对象。这两个意思是,“你完成后要调用的方法在这里”和“我需要在该方法内部的一些数据。”始终调用具有相同签名的方法,在此方法内部调用End*对应项,以获取如果您以同步方式执行它将得到的结果。例如:
private void BeginReceiveBuffer()
{
   _socket.BeginReceive(buffer, 0, buffer.Length, BufferEndReceive, buffer);
}

private void EndReceiveBuffer(IAsyncResult state)
{
   var buffer = (byte[])state.AsyncState; // This is the last parameter.
   var length = _socket.EndReceive(state); // This is the return value of the method call.
   DataReceived(buffer, 0, length); // Do something with the data.
}

这里发生的是,.Net开始等待来自套接字的数据,一旦获得数据,它就调用EndReceiveBuffer并通过state.AsyncResult将“自定义数据”(在本例中为buffer)传递给它。当您调用EndReceive时,它将返回接收到的数据长度(或如果出现故障则抛出异常)。
更好的套接字模式
这种形式将为您提供集中的错误处理 - 它可以在任何异步模式包装流式“事物”的地方使用(例如,TCP按发送顺序到达,因此可以看作是一个Stream对象)。
private Socket _socket;
private ArraySegment<byte> _buffer;
public void StartReceive()
{
    ReceiveAsyncLoop(null);
}

// Note that this method is not guaranteed (in fact
// unlikely) to remain on a single thread across
// async invocations.
private void ReceiveAsyncLoop(IAsyncResult result)
{
    try
    {
        // This only gets called once - via StartReceive()
        if (result != null)
        {
            int numberOfBytesRead = _socket.EndReceive(result);
            if(numberOfBytesRead == 0)
            {
                OnDisconnected(null); // 'null' being the exception. The client disconnected normally in this case.
                return;
            }

            var newSegment = new ArraySegment<byte>(_buffer.Array, _buffer.Offset, numberOfBytesRead);
            // This method needs its own error handling. Don't let it throw exceptions unless you
            // want to disconnect the client.
            OnDataReceived(newSegment);
        }

        // Because of this method call, it's as though we are creating a 'while' loop.
        // However this is called an async loop, but you can see it the same way.
        _socket.BeginReceive(_buffer.Array, _buffer.Offset, _buffer.Count, SocketFlags.None, ReceiveAsyncLoop, null);
    }
    catch (Exception ex)
    {
        // Socket error handling here.
    }
}

接受多个连接

通常情况下,您需要编写一个包含套接字和异步循环的类,并为每个客户端创建一个类实例。例如:

public class InboundConnection
{
    private Socket _socket;
    private ArraySegment<byte> _buffer;

    public InboundConnection(Socket clientSocket)
    {
        _socket = clientSocket;
        _buffer = new ArraySegment<byte>(new byte[4096], 0, 4096);
        StartReceive(); // Start the read async loop.
    }

    private void StartReceive() ...
    private void ReceiveAsyncLoop() ...
    private void OnDataReceived() ...
}

你的服务器类应该跟踪每个客户端连接(这样当服务器关闭时,你可以清除它们,并进行搜索/查找)。


1
我忘了提到你也可以以同样的方式接受客户端,例如BeginAcceptTcpClient。你也可以设置异步循环相同的方式。 - Jonathan Dickinson
1
博客文章链接已失效。但是在archive.org上可以找到:https://web.archive.org/web/20121127003207/http://jonathan.dickinsons.co.za/blog/2011/02/net-sockets-and-you - Joseph Daigle
2
@JosephDaigle 谢谢提醒,我会重新托管它。 - Jonathan Dickinson
大家要记住,在将 newSegment(在 OnDataReceived 中)转换为任何类型的有效负载时,请使用 numberOfBytesRead。例如,我正在处理通过 TCP 发送的字符串消息,并且正在使用 Encoding.ASCII.GetString(dataSegment.Array, 0, dataSegment.Count);,而我应该使用 Encoding.ASCII.GetString(dataSegment.Array, 0, numberOfBytesRead);。事后看来,这是一个非常愚蠢的错误,但我花了一些时间才弄清楚。 - tkit

1

你应该使用异步读取数据的方法,例如:

// Enter the listening loop.
while (true)
{
    Debug.WriteLine("Waiting for a connection... ");

    client = server.AcceptTcpClient();

    ThreadPool.QueueUserWorkItem(new WaitCallback(HandleTcp), client);
}

private void HandleTcp(object tcpClientObject)
{
    TcpClient client = (TcpClient)tcpClientObject;
    // Perform a blocking call to accept requests.

    data = new List<byte>();

    // Get a stream object for reading and writing
    using (NetworkStream stream = client.GetStream())
    {
        // Loop to receive all the data sent by the client.
        int length;

        while ((length = stream.Read(bytes, 0, bytes.Length)) != 0)
        {
            var copy = new byte[length];
            Array.Copy(bytes, 0, copy, 0, length);
            data.AddRange(copy);
        }
    }

    receivedQueue.Add(data);
} 

此外,您应该考虑使用AutoResetEventManualResetEvent来在新数据添加到集合时得到通知,以便处理数据的线程将知道何时接收到数据。如果您正在使用4.0,最好切换到使用BlockingCollection而不是Queue


已经在使用BlockingCollection。由于我有一个专用线程来接收文件,所以我正在使用同步方法。在您上面的示例中,如果两个客户端同时连接,server.AcceptTcpClient会接受两个还是其中一个将排队等待TcpListener下一次可用(在HandleTcp之后)?另外需要注意的是,如果您正在使用.Net 4,则应该使用Task库而不是ThreadPool。 - Dylan
@Jonathan:你错了!没有通用的规则需要一直遵循。而且使用4.0并不意味着我们应该使用TPL而不是ThreadPool。有时候,使用Task“它在内部实现中使用ThreadPool”可能会过度,因为TaskIDisposable,这意味着当它完成执行时我们应该注意将其Dispose - Jalal Said
1
你永远不应该使用线程池 ThreadPool。如果连接的客户端数量超过30 * CPU计数,你将会使你的服务器缺乏工作线程,整个系统将会停滞不前。我说过,你可以在4.0中使用TPL、事件模型或者Begin/End模式,每种方法都有其优点,但是ThreadPool封装同步调用对于套接字来说只是明显的糟糕建议。 - Jonathan Dickinson
@Jonathan:这取决于在ThreadPool中执行数据所需的时间,例如在这里他只是读取数据,将其放入队列中并且Dispose流。 - Jalal Said
@EJP,您在这里的评论和踩赞是不正确的!您在评论中说:“不。他应该为每个已接受的连接使用单独的线程”,但我已经为每个已接受的连接使用了单独的线程,“ThreadPool.QueueUserWorkItem”将启动一个新线程。 - Jalal Said
显示剩余4条评论

1

您应该使用异步套接字编程来实现此操作。请查看MSDN提供的示例


0
通常我所做的是使用包含多个线程的线程池。每当有新连接时,我会在线程池中的一个线程上运行连接处理程序(在你的情况下 - 在using子句中执行的所有操作)。
通过这样做,您既可以实现性能,因为您允许同时接受几个连接,也可以限制为处理传入连接分配的资源数量(线程等)。
你可以在这里找到一个很好的例子here 祝你好运

1
再次提到“每个客户端一个线程”的事情。这真的是很糟糕的做法。 - Jonathan Dickinson

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接