C++在Linux系统中对epoll和socket fd以及异步线程的困惑

6

我正在尝试理解如何将epoll库(Linux)与socket文件描述符结合使用。据我所知,关于epoll的信息很有限。到目前为止,我发现最有用的资源是这个:

https://www.suchprogramming.com/epoll-in-3-easy-steps/

这提供了一个完整的工作示例,展示了如何使用 epoll 与 stdin 文件描述符配合使用。(我在其他地方读到 epoll 不能与 stdinstdoutstderr 配合使用。我认为这个信息是错误的。)

我尝试使用 MWE 进一步完善。

#include <string>
#include <sstream>
#include <iostream>
#include <fstream>
#include <string.h>
#include <unistd.h>
#include <future>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/epoll.h>


int thread_function(int accept_sock_fd)
{
    char *read_buffer = new char[1024];

    unsigned int read_length = 0;
    std::string read_string = "";
    for(;;)
    {
        unsigned int t_read_length = read(accept_sock_fd, read_buffer, 1024 - 1);
        //std::cout << "t_read_length=" << t_read_length << std::endl;
        //std::cout << read_buffer << std::endl;
        read_string += std::string(read_buffer);
        read_length += t_read_length;

        // dodgy ?
        // tries to detect end of data using new lines
        // but what if client does not send new line char?
        if(read_buffer[t_read_length - 1] == '\n')
        {
            break;
        }
    }
    std::cout << read_string << std::endl;
    //std::cout << read_length << std::endl;
    //std::cout << read_buffer << std::endl;

    std::string status_line = "HTTP/1.0 200 OK\r\n\r\n";
    std::string html_lines = "Hello World\r\n\r\n";
    std::string write_string = status_line + html_lines;
    //std::cout << write_string << std::endl;
    unsigned int write_length = write(accept_sock_fd, write_string.data(), write_string.length());
    //std::cout << "write_length=" << write_length << std::endl;
    close(accept_sock_fd);

    delete [] read_buffer;

    return 0;
}


int main()
{

    // read configuration file
    //Config config;
    //config.ReadFromFile("config.txt");
    //std::cout << config << std::endl;

    unsigned int num_threads = 4; //config.GetNumThreads();


    ///////////////////////////////////////////////////////////////////////////
    // init epoll
    //
    //

    int epoll_fd = epoll_create(num_threads);
    if(epoll_fd < 0)
    {
        std::cout << "epoll error" << std::endl;
    }

    struct epoll_event event;
    struct epoll_event *events = new struct epoll_event[num_threads * sizeof(struct epoll_event)];
    


    ///////////////////////////////////////////////////////////////////////////
    // create socket fd
    //
    //

    int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
    if(sock_fd == 0)
    {
        std::cout << "Error creating sock_fd" << std::endl;
    }
//    int setsockopt(sockfd, 

    struct sockaddr_in server_address;
    memset(&server_address, 0, sizeof(struct sockaddr_in));
    server_address.sin_family = AF_INET;
    server_address.sin_addr.s_addr = INADDR_ANY;
    server_address.sin_port = htons(55555 /*config.GetPort()*/);

    bind(sock_fd, (struct sockaddr *)&server_address, sizeof(server_address));
    if(bind < 0)
    {
        std::cout << "Error: bind" << std::endl;
    }

    if(listen(sock_fd, 10) < 0)
    {
        std::cout << "Error: listen" << std::endl;
    }


    ///////////////////////////////////////////////////////////////////////////
    // continue epoll setup
    //
    //

    event.events = EPOLLIN; // | EPOLLPRI | EPOLLERR | EPOLLHUP;
    event.data.fd = 0; //client_sock;
    if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sock_fd, &event)) // not sure about this
    {
        std::cout << "Failed to add epoll" << std::endl;
        return 1;
    }


    for(;;)
    {
        int n_fd = epoll_wait(epoll_fd, events, num_threads, 0);
        if(n_fd < 0)
        {
            std::cout << "die" << std::endl;
            break;
        }

        for(int i = 0; i < n_fd; ++ i)
        {
            int fd = events[i].data.fd;
            
            // accpt connection
            int accept_sock_fd = 0;
            struct sockaddr client_address;
            int addrlen = sizeof(client_address);
            accept_sock_fd = accept(fd, NULL, NULL); // not sure what goes here
            accept_sock_fd = accept(sock_fd, NULL, NULL); // not sure what goes here
            if(accept_sock_fd < 0)
            {
                std::cout << "Error: accept" << std::endl;
            }

            // do something to handle data on accept_sock_fd
            std::future<int> thread_future = std::async(thread_function, accept_sock_fd);
            int ret = thread_future.get();
            std::cout << ret << std::endl;

            // close connection
            close(accept_sock_fd);
            
        }
    }


    delete [] events;

    close(sock_fd);


    if(close(epoll_fd))
    {
        std::cout << "error: failed to close epoll fd" << std::endl;
    }


    return 0;
}

我已经广泛搜索了有关如何将 epoll 与套接字结合使用的进一步信息,但是关于这方面的信息并不多。我想这是一个相当小众的领域。

理想情况下,如果有人知道任何好的资源(甚至一本书),在那里我可以找到更多有关此内容的信息,从而自己解决问题,那就太棒了,否则,希望能提供一些关于如何继续的建议。

我最困惑的是如何监听套接字 fd 上的事件,以及如何接受事件并处理由 accept 返回的新 fd。

请参见上述代码中的这些行:

// correct fd
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, /*the_server_socket_fd*/, &event);
...
// not sure about this one, particularly the waiting time of 0
for(;;)
    epoll_wait(epoll_fd, events, /*num_threads*/, 0);
    
    for(i ...)
        fd = events[i].data.fd; // what fd is this? what does it mean?
        accept(/*something here, but what*/ fd OR the_server_socket_fd ?, NULL, NULL);
        // launch thread with retuned value from accept, probably

1
请尝试缩小问题范围。目前涉及处理服务器套接字和 epoll。您可以在 man epoll 中看到 epoll 与套接字的简单示例,例如:https://man7.org/linux/man-pages/man7/epoll.7.html。 - bartop
2个回答

12

使用 epoll 与使用 poll 没有太大区别。 麻烦的是,要跟踪哪个客户端拥有 epoll 返回的文件描述符(如果有很多客户端)。


如何使用 epoll 异步处理套接字文件(在此情况下作为 TCP 套接字服务器)。

  1. 使用 epoll_create(2) 打开一个 epoll 文件描述符。
  2. 使用 socket(2)bind(2)listen(2) 创建一个 TCP 套接字。
  3. 使用 epoll_ctl + EPOLL_CTL_ADD 将主 TCP 套接字文件描述符添加到 epoll 中。
  4. 在循环中调用 epoll_wait,程序将在 epoll_wait 上休眠,内核将在监视文件描述符有事件到来达到超时时间时唤醒程序。
  5. 如果 epoll_wait 返回一个大于零的值,则需要决定 epoll 返回的文件描述符是哪个。

5.1. 如果它是主 TCP 文件描述符,则需要使用 accept(2)。 然后使用 epoll_ctl + EPOLL_CTL_ADD 将由 accept(2) 返回的客户端文件描述符添加到 epoll 中。 其他

5.2. 如果它是客户端文件描述符,则需要调用 recv(2) 并执行任何您想要与该客户端执行的操作。

在步骤5.2中,如果您在 events[i].events 中看到 EPOLLHUP,则表示客户端已关闭其连接,您需要调用 epoll_ctl + EPOLL_CTL_DEL 并关闭客户端文件描述符(可以安全地不调用 epoll_ctl + EPOLL_CTL_DEL 并只关闭客户端文件描述符,但我更喜欢先从 epoll 中删除它)。


  1. 跳转至第4步。

有关如何确定哪个客户拥有 epoll 返回的文件描述符的详细机制,请参见下面的 server.c 代码。


流程图

流程图可以使它更清晰。 Flowchart Epoll


您可以在家中尝试的工作示例

server.c

/*
 * https://dev59.com/ccDqa4cB1Zd3GeqPWyT7
 */
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <stdbool.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>

#define PRERF "(errno=%d) %s\n"
#define PREAR(NUM) NUM, strerror(NUM)
#define EPOLL_MAP_TO_NOP (0u)
#define EPOLL_MAP_SHIFT  (1u) /* Shift to cover reserved value MAP_TO_NOP */

struct client_slot {
    bool                is_used;
    int                 client_fd;
    char                src_ip[sizeof("xxx.xxx.xxx.xxx")];
    uint16_t            src_port;
    uint16_t            my_index;
};

struct tcp_state {
    bool                stop;
    int                 tcp_fd;
    int                 epoll_fd;
    uint16_t            client_c;
    struct client_slot  clients[10];

    /*
     * Map the file descriptor to client_slot array index
     * Note: We assume there is no file descriptor greater than 10000.
     *
     * You must adjust this in production.
     */
    uint32_t            client_map[10000];
};


static int my_epoll_add(int epoll_fd, int fd, uint32_t events)
{
    int err;
    struct epoll_event event;

    /* Shut the valgrind up! */
    memset(&event, 0, sizeof(struct epoll_event));

    event.events  = events;
    event.data.fd = fd;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event) < 0) {
        err = errno;
        printf("epoll_ctl(EPOLL_CTL_ADD): " PRERF, PREAR(err));
        return -1;
    }
    return 0;
}



static int my_epoll_delete(int epoll_fd, int fd)
{
    int err;

    if (epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, NULL) < 0) {
        err = errno;
        printf("epoll_ctl(EPOLL_CTL_DEL): " PRERF, PREAR(err));
        return -1;
    }
    return 0;
}


static const char *convert_addr_ntop(struct sockaddr_in *addr, char *src_ip_buf)
{
    int err;
    const char *ret;
    in_addr_t saddr = addr->sin_addr.s_addr;

    ret = inet_ntop(AF_INET, &saddr, src_ip_buf, sizeof("xxx.xxx.xxx.xxx"));
    if (ret == NULL) {
        err = errno;
        err = err ? err : EINVAL;
        printf("inet_ntop(): " PRERF, PREAR(err));
        return NULL;
    }

    return ret;
}


static int accept_new_client(int tcp_fd, struct tcp_state *state)
{
    int err;
    int client_fd;
    struct sockaddr_in addr;
    socklen_t addr_len = sizeof(addr);
    uint16_t src_port;
    const char *src_ip;
    char src_ip_buf[sizeof("xxx.xxx.xxx.xxx")];
    const size_t client_slot_num = sizeof(state->clients) / sizeof(*state->clients);


    memset(&addr, 0, sizeof(addr));
    client_fd = accept(tcp_fd, (struct sockaddr *)&addr, &addr_len);
    if (client_fd < 0) {
        err = errno;
        if (err == EAGAIN)
            return 0;

        /* Error */
        printf("accept(): " PRERF, PREAR(err));
        return -1;
    }

    src_port = ntohs(addr.sin_port);
    src_ip   = convert_addr_ntop(&addr, src_ip_buf);
    if (!src_ip) {
        printf("Cannot parse source address\n");
        goto out_close;
    }


    /*
     * Find unused client slot.
     *
     * In real world application, you don't want to iterate
     * the whole array, instead you can use stack data structure
     * to retrieve unused index in O(1).
     *
     */
    for (size_t i = 0; i < client_slot_num; i++) {
        struct client_slot *client = &state->clients[i];

        if (!client->is_used) {
            /*
             * We found unused slot.
             */

            client->client_fd = client_fd;
            memcpy(client->src_ip, src_ip_buf, sizeof(src_ip_buf));
            client->src_port = src_port;
            client->is_used = true;
            client->my_index = i;

            /*
             * We map the client_fd to client array index that we accept
             * here.
             */
            state->client_map[client_fd] = client->my_index + EPOLL_MAP_SHIFT;

            /*
             * Let's tell to `epoll` to monitor this client file descriptor.
             */
            my_epoll_add(state->epoll_fd, client_fd, EPOLLIN | EPOLLPRI);

            printf("Client %s:%u has been accepted!\n", src_ip, src_port);
            return 0;
        }
    }
    printf("Sorry, can't accept more client at the moment, slot is full\n");


out_close:
    close(client_fd);
    return 0;
}


static void handle_client_event(int client_fd, uint32_t revents,
                                struct tcp_state *state)
{
    int err;
    ssize_t recv_ret;
    char buffer[1024];
    const uint32_t err_mask = EPOLLERR | EPOLLHUP;
    /*
     * Read the mapped value to get client index.
     */
    uint32_t index = state->client_map[client_fd] - EPOLL_MAP_SHIFT;
    struct client_slot *client = &state->clients[index];

    if (revents & err_mask)
        goto close_conn;

    recv_ret = recv(client_fd, buffer, sizeof(buffer), 0);
    if (recv_ret == 0)
        goto close_conn;

    if (recv_ret < 0) {
        err = errno;
        if (err == EAGAIN)
            return;

        /* Error */
        printf("recv(): " PRERF, PREAR(err));
        goto close_conn;
    }


    /*
     * Safe printing
     */
    buffer[recv_ret] = '\0';
    if (buffer[recv_ret - 1] == '\n') {
        buffer[recv_ret - 1] = '\0';
    }

    printf("Client %s:%u sends: \"%s\"\n", client->src_ip, client->src_port,
           buffer);
    return;


close_conn:
    printf("Client %s:%u has closed its connection\n", client->src_ip,
           client->src_port);
    my_epoll_delete(state->epoll_fd, client_fd);
    close(client_fd);
    client->is_used = false;
    return;
}


static int event_loop(struct tcp_state *state)
{
    int err;
    int ret = 0;
    int timeout = 3000; /* in milliseconds */
    int maxevents = 32;
    int epoll_ret;
    int epoll_fd = state->epoll_fd;
    struct epoll_event events[32];

    printf("Entering event loop...\n");

    while (!state->stop) {

        /*
         * I sleep on `epoll_wait` and the kernel will wake me up
         * when event comes to my monitored file descriptors, or
         * when the timeout reached.
         */
        epoll_ret = epoll_wait(epoll_fd, events, maxevents, timeout);


        if (epoll_ret == 0) {
            /*
             *`epoll_wait` reached its timeout
             */
            printf("I don't see any event within %d milliseconds\n", timeout);
            continue;
        }


        if (epoll_ret == -1) {
            err = errno;
            if (err == EINTR) {
                printf("Something interrupted me!\n");
                continue;
            }

            /* Error */
            ret = -1;
            printf("epoll_wait(): " PRERF, PREAR(err));
            break;
        }


        for (int i = 0; i < epoll_ret; i++) {
            int fd = events[i].data.fd;

            if (fd == state->tcp_fd) {
                /*
                 * A new client is connecting to us...
                 */
                if (accept_new_client(fd, state) < 0) {
                    ret = -1;
                    goto out;
                }
                continue;
            }

            /*
             * We have event(s) from client, let's call `recv()` to read it.
             */
            handle_client_event(fd, events[i].events, state);
        }
    }

out:
    return ret;
}


static int init_epoll(struct tcp_state *state)
{
    int err;
    int epoll_fd;

    printf("Initializing epoll_fd...\n");

    /* The epoll_create argument is ignored on modern Linux */
    epoll_fd = epoll_create(255);
    if (epoll_fd < 0) {
        err = errno;
        printf("epoll_create(): " PRERF, PREAR(err));
        return -1;
    }

    state->epoll_fd = epoll_fd;
    return 0;
}


static int init_socket(struct tcp_state *state)
{
    int ret;
    int err;
    int tcp_fd = -1;
    struct sockaddr_in addr;
    socklen_t addr_len = sizeof(addr);
    const char *bind_addr = "0.0.0.0";
    uint16_t bind_port = 1234;

    printf("Creating TCP socket...\n");
    tcp_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
    if (tcp_fd < 0) {
        err = errno;
        printf("socket(): " PRERF, PREAR(err));
        return -1;
    }

    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(bind_port);
    addr.sin_addr.s_addr = inet_addr(bind_addr);

    ret = bind(tcp_fd, (struct sockaddr *)&addr, addr_len);
    if (ret < 0) {
        ret = -1;
        err = errno;
        printf("bind(): " PRERF, PREAR(err));
        goto out;
    }

    ret = listen(tcp_fd, 10);
    if (ret < 0) {
        ret = -1;
        err = errno;
        printf("listen(): " PRERF, PREAR(err));
        goto out;
    }

    /*
     * Add `tcp_fd` to epoll monitoring.
     *
     * If epoll returned tcp_fd in `events` then a client is
     * trying to connect to us.
     */
    ret = my_epoll_add(state->epoll_fd, tcp_fd, EPOLLIN | EPOLLPRI);
    if (ret < 0) {
        ret = -1;
        goto out;
    }

    printf("Listening on %s:%u...\n", bind_addr, bind_port);
    state->tcp_fd = tcp_fd;
    return 0;
out:
    close(tcp_fd);
    return ret;
}


static void init_state(struct tcp_state *state)
{
    const size_t client_slot_num = sizeof(state->clients) / sizeof(*state->clients);
    const uint16_t client_map_num = sizeof(state->client_map) / sizeof(*state->client_map);

    for (size_t i = 0; i < client_slot_num; i++) {
        state->clients[i].is_used = false;
        state->clients[i].client_fd = -1;
    }

    for (uint16_t i = 0; i < client_map_num; i++) {
        state->client_map[i] = EPOLL_MAP_TO_NOP;
    }
}


int main(void)
{
    int ret;
    struct tcp_state state;

    init_state(&state);

    ret = init_epoll(&state);
    if (ret != 0)
        goto out;


    ret = init_socket(&state);
    if (ret != 0)
        goto out;


    state.stop = false;

    ret = event_loop(&state);

out:
    /*
     * You should write a cleaner here.
     *
     * Close all client file descriptors and release
     * some resources you may have.
     *
     * You may also want to set interrupt handler
     * before the event_loop.
     *
     * For example, if you get SIGINT or SIGTERM
     * set `state->stop` to true, so that it exits
     * gracefully.
     */
    return ret;
}

test.php(PHP脚本模拟客户端)

我太懒了,不想用C语言写客户端TCP套接字。所以我使用PHP。

<?php

function main(): int
{
    $sock = socket_create(AF_INET, SOCK_STREAM, 0);
    $conn = socket_connect($sock, "127.0.0.1", 1234);
    if (!$conn)
        return 1;
    socket_write($sock, "AAAAAAAA\n", 9);
    sleep(1);
    socket_write($sock, "BBBBBBBB\n", 9);
    sleep(1);
    socket_write($sock, "CCCCCCCC\n", 9);
    sleep(1);
    socket_close($sock);
    return 0;
}

exit(main());


编译并运行服务器
ammarfaizi2@integral:~/ex$ gcc -Wall -Wextra -ggdb3 server.c -o server
ammarfaizi2@integral:~/ex$ ./server
Initializing epoll_fd...
Creating TCP socket...
Listening on 0.0.0.0:1234...
Entering event loop...

使用test.php模拟多个客户端

ammarfaizi2@integral:~$ for i in {1..10}; do php test.php & done;
[1] 14214
[2] 14215
[3] 14216
[4] 14217
[5] 14218
[6] 14219
[7] 14220
[8] 14221
[9] 14222
[10] 14223
ammarfaizi2@integral:~$ 

客户端连接时服务器的输出

ammarfaizi2@integral:~/ex$ ./server
Initializing epoll_fd...
Creating TCP socket...
Listening on 0.0.0.0:1234...
Entering event loop...
Client 127.0.0.1:60866 has been accepted!
Client 127.0.0.1:60866 sends: "AAAAAAAA"
Client 127.0.0.1:60868 has been accepted!
Client 127.0.0.1:60868 sends: "AAAAAAAA"
Client 127.0.0.1:60870 has been accepted!
Client 127.0.0.1:60872 has been accepted!
Client 127.0.0.1:60870 sends: "AAAAAAAA"
Client 127.0.0.1:60872 sends: "AAAAAAAA"
Client 127.0.0.1:60874 has been accepted!
Client 127.0.0.1:60874 sends: "AAAAAAAA"
Client 127.0.0.1:60876 has been accepted!
Client 127.0.0.1:60878 has been accepted!
Client 127.0.0.1:60878 sends: "AAAAAAAA"
Client 127.0.0.1:60876 sends: "AAAAAAAA"
Client 127.0.0.1:60880 has been accepted!
Client 127.0.0.1:60880 sends: "AAAAAAAA"
Client 127.0.0.1:60882 has been accepted!
Client 127.0.0.1:60882 sends: "AAAAAAAA"
Client 127.0.0.1:60884 has been accepted!
Client 127.0.0.1:60884 sends: "AAAAAAAA"
Client 127.0.0.1:60866 sends: "BBBBBBBB"
Client 127.0.0.1:60868 sends: "BBBBBBBB"
Client 127.0.0.1:60870 sends: "BBBBBBBB"
Client 127.0.0.1:60872 sends: "BBBBBBBB"
Client 127.0.0.1:60874 sends: "BBBBBBBB"
Client 127.0.0.1:60878 sends: "BBBBBBBB"
Client 127.0.0.1:60876 sends: "BBBBBBBB"
Client 127.0.0.1:60880 sends: "BBBBBBBB"
Client 127.0.0.1:60882 sends: "BBBBBBBB"
Client 127.0.0.1:60884 sends: "BBBBBBBB"
Client 127.0.0.1:60866 sends: "CCCCCCCC"
Client 127.0.0.1:60868 sends: "CCCCCCCC"
Client 127.0.0.1:60870 sends: "CCCCCCCC"
Client 127.0.0.1:60872 sends: "CCCCCCCC"
Client 127.0.0.1:60874 sends: "CCCCCCCC"
Client 127.0.0.1:60878 sends: "CCCCCCCC"
Client 127.0.0.1:60876 sends: "CCCCCCCC"
Client 127.0.0.1:60880 sends: "CCCCCCCC"
Client 127.0.0.1:60882 sends: "CCCCCCCC"
Client 127.0.0.1:60884 sends: "CCCCCCCC"
Client 127.0.0.1:60866 has closed its connection
Client 127.0.0.1:60868 has closed its connection
Client 127.0.0.1:60870 has closed its connection
Client 127.0.0.1:60872 has closed its connection
Client 127.0.0.1:60874 has closed its connection
Client 127.0.0.1:60878 has closed its connection
Client 127.0.0.1:60876 has closed its connection
Client 127.0.0.1:60880 has closed its connection
Client 127.0.0.1:60882 has closed its connection
Client 127.0.0.1:60884 has closed its connection

1
谢谢,这是一个非常详细的答案。我可能会有一些关于与异步集成的问题,但得等到今天晚些时候才能问。 - FreelanceConsultant

2
假设您已经打开了服务器套接字,您可以使用以下代码。它基于man epoll,可在此处访问。
// assume you have opened listen_sock properly
// int listen_sock;
// 
int epollfd = epoll_create1(0);
if (epollfd == -1) {
  exit(1);
}

struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
  exit(2);
}

for (;;) {
  #define MAX_EVENTS 64
  struct epoll_event events[MAX_EVENTS];
  int events_count = epoll_wait(epollfd, events, MAX_EVENTS, -1);
  if (n == -1) {
    exit(3);
  }

  for (int n = 0; n < events_count; ++ n) {
    if (events[n].data.fd == listen_sock) {
      struct sockaddr_un addr;
      socklen_t addrlen;
      int socket = accept(listen_sock, (struct sockaddr *) &addr, &addrlen);
    }
  }
}

我修改了你的答案,在for循环中将x改为了n,这应该是你想表达的意思吧?非常有用,谢谢 - 我没有想到去man页面查找示例。 - FreelanceConsultant
顺便问一下,sockaddr_un 需要哪个头文件?为什么要使用它而不是 sockaddr_in - FreelanceConsultant
1
@自由职业顾问 这是一个Unix域套接字地址。请参阅https://www.man7.org/linux/man-pages/man7/unix.7.html - Shawn

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接