如何在C/C++中进行TCP连接池?

8
我正在用C++设计一个分布式的服务端/客户端系统,其中许多客户端通过TCP向许多服务器发送请求,服务器则抛出一个线程来处理请求并将其响应发送回来。在我的用例中,只有有限数量的客户端将访问服务器,并且我需要非常高的性能。从客户端和服务器发送的数据都很小,但非常频繁。因此,创建连接并在使用后关闭它是昂贵的。因此,我想使用连接缓存来解决这个问题:一旦连接创建,它将被存储在缓存中以供将来使用。(假设客户端的数量不超过缓存大小)。
我的问题是:
  1. 我看到有人说连接池是客户端技术。如果这个连接池仅在客户端使用,则第一次创建与服务器的连接,并发送数据。这个建立连接的动作会触发服务器端的accept()函数,返回一个套接字以接收来自客户端的数据。因此,当客户端想要使用现有的连接(在缓存中),它不会创建新的连接,而只是发送数据。问题是,如果没有创建连接,谁会触发服务器端的accept()函数并抛出一个线程?
  2. 如果连接池也需要在服务器端实现,我如何知道请求来自哪里?由于只有从accept()中可以获得客户端地址,但同时accept()已经为该请求创建了一个新的套接字,因此使用缓存连接就毫无意义了。
任何答案和建议都将不胜感激。或者有人可以给我一个连接池或连接缓存的例子吗?

我想这么做,但不知道如何在多线程情况下实现。正如我在问题中所说,我使用accept()来获取即将到来的请求。如果使用一个始终打开的连接,我如何知道何时有请求到来? - Tony
调查一下 boost::asio,可能会让你的生活更轻松... - Nim
Asio 应该是一个不错的建议,但是似乎它的性能不够好,而我需要尽可能高的速度和尽可能少的依赖。谢谢您的建议。 - Tony
1
为什么需要性能?首先集中精力于连接池的需要似乎更合理。如果您不知道将来是否会再次需要它们,为什么要保持连接开放?您如何知道将来会需要它们? - wildplasser
1
@Tony,一个简单的互斥锁就足够了。 - J.N.
显示剩余6条评论
1个回答

13
首先,连接池不仅仅是客户端技术;它是一种连接模式技术。它适用于两种类型的对等方(“服务器”和“客户端”)。
其次,并不需要调用accept来启动线程。程序可以出于任何原因启动线程...他们可以只为了启动更多的线程而启动线程,在一个大规模的线程创建循环中。(编辑:我们称之为“fork bomb”)
最后,高效的线程池实现不会为每个客户端启动一个线程。每个线程通常占用512KB-4MB(计算堆栈空间和其他上下文信息),因此如果您有10000个客户端,每个客户端都占用这么多,那将浪费大量内存。
引用:“我想这样做,但不知道如何在多线程情况下实现。”

您在这里不应该使用多线程...至少在您拥有使用单个线程的解决方案并且您确定它不够快之前,不要使用多线程。目前,您没有这些信息;您只是在猜测,并且猜测不能保证优化。

在世纪之交,有FTP服务器解决了C10K问题;他们能够处理10000个客户端,在任何时候都可以浏览、下载或空闲,就像用户在FTP服务器上做的那样。他们解决了这个问题,而不是使用线程,而是使用非阻塞和/或异步套接字和/或调用

为了澄清,这些Web服务器在单个线程上处理数千个连接!一种典型的方法是使用select,但我不特别喜欢这种方法,因为它需要一系列相当丑陋的循环。我更喜欢使用ioctlsocket来设置Windows和其他POSIX操作系统中的文件描述符为非阻塞模式,例如:

#ifdef WIN32
ioctlsocket(fd, FIONBIO, (u_long[]){1});
#else
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
#endif

此时,当在fd上操作时,recvread不会阻塞;如果没有可用数据,则它们将立即返回错误值,而不是等待数据到达。这意味着您可以循环多个套接字。
“如果服务器端还需要实现连接池,我该如何知道请求来自哪里?”
将客户端的fd与其struct sockaddr_storage以及您需要存储有关客户端的任何其他有状态信息一起存储在您声明的结构体中。如果这最终成为4KB(这是相当大的结构体,通常是它们需要变得越来越大),则其中10000个仅占用约40000KB(~40MB)。即使是今天的手机也应该没有处理这个问题的问题。考虑根据您的需求完成以下代码:
struct client {
    struct sockaddr_storage addr;
    socklen_t addr_len;
    int fd;
    /* Other stateful information */
};

#define BUFFER_SIZE 4096
#define CLIENT_COUNT 10000

int main(void) {
    int server;
    struct client client[CLIENT_COUNT] = { 0 };
    size_t client_count = 0;
    /* XXX: Perform usual bind/listen */
    #ifdef WIN32
    ioctlsocket(server, FIONBIO, (u_long[]){1});
    #else
    fcntl(server, F_SETFL, fcntl(server, F_GETFL, 0) | O_NONBLOCK);
    #endif

    for (;;) {
        /* Accept connection if possible */
        if (client_count < sizeof client / sizeof *client) {
            struct sockaddr_storage addr = { 0 };
            socklen_t addr_len = sizeof addr;
            int fd = accept(server, &addr, &addr_len);
            if (fd != -1) {
#               ifdef WIN32
                ioctlsocket(fd, FIONBIO, (u_long[]){1});
#               else
                fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
#               endif
                client[client_count++] = (struct client) { .addr = addr
                                                         , .addr_len = addr_len
                                                         , .fd = fd };
            }
        }
        /* Loop through clients */
        char buffer[BUFFER_SIZE];
        for (size_t index = 0; index < client_count; index++) {
            ssize_t bytes_recvd = recv(client[index].fd, buffer, sizeof buffer, 0);
#           ifdef WIN32
            int closed = bytes_recvd == 0
                      || (bytes_recvd < 0 && WSAGetLastError() == WSAEWOULDBLOCK);
#           else
            int closed = bytes_recvd == 0
                      || (bytes_recvd < 0 && errno == EAGAIN) || errno == EWOULDBLOCK;
#           endif
            if (closed) {
                close(client[index].fd);
                client_count--;
                memmove(client + index, client + index + 1, (client_count - index) * sizeof client);
                continue;
            }
            /* XXX: Process buffer[0..bytes_recvd-1] */
        }

        sleep(0); /* This is necessary to pass control back to the kernel,
                   * so it can queue more data for us to process
                   */
    }
}

假设您想在客户端汇集连接,则代码看起来非常相似,除了显然不需要accept相关的代码。假设您有一个要connectclient数组,您可以使用非阻塞连接调用一次执行所有连接,如下所示:
size_t index = 0, in_progress = 0;
for (;;) {
    if (client[index].fd == 0) {
        client[index].fd = socket(/* TODO */);
#       ifdef WIN32
        ioctlsocket(client[index].fd, FIONBIO, (u_long[]){1});
#       else
        fcntl(client[index].fd, F_SETFL, fcntl(client[index].fd, F_GETFL, 0) | O_NONBLOCK);
#       endif
    }
#   ifdef WIN32
    in_progress += connect(client[index].fd, (struct sockaddr *) &client[index].addr, client[index].addr_len) < 0
                && (WSAGetLastError() == WSAEALREADY
                ||  WSAGetLastError() == WSAEWOULDBLOCK
                ||  WSAGetLastError() == WSAEINVAL);
#   else
    in_progress += connect(client[index].fd, (struct sockaddr *) &client[index].addr, client[index].addr_len) < 0
                && (errno == EALREADY
                ||  errno == EINPROGRESS);
#   endif
    if (++index < sizeof client / sizeof *client) {
        continue;
    }
    index = 0;
    if (in_progress == 0) {
        break;
    }
    in_progress = 0;
}

关于优化,只要稍加调整,这个程序应该可以处理10000个客户端,不需要多线程。
尽管如此,通过将来自“互斥”集合的项与“客户端”相关联,并在非阻塞套接字操作之前使用“非阻塞pthread_mutex_trylock”,上面的循环可以适应于在同时运行多个线程时处理相同组套接字。这为所有符合POSIX标准的平台提供了一个工作模型,无论是Windows、BSD还是Linux,但它并不是完全最佳的。要达到最佳状态,我们必须进入“异步”世界,这取决于不同的系统:

可能需要将之前提到的"非阻塞套接字操作"抽象化,因为这两种异步机制在接口方面存在显著差异。不幸的是,我们必须编写抽象代码,以便我们的Windows相关代码在符合POSIX标准的系统上保持可读性。作为额外的奖励,这将允许我们将服务器处理(即accept和随后的任何内容)与客户端处理(即connect和随后的任何内容)混合在一起,因此我们的服务器循环可以变成一个客户端循环(反之亦然)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接