在多线程中共享同一个epoll文件描述符是否可行?

8

在多个线程之间共享相同的Epoll文件描述符(而不是套接字文件描述符)是否安全?如果可以共享,则每个线程是否需要将自己的事件数组传递给epoll_wait(2),还是它们可以共享它?

例如

    void *thread_func(void *thread_args) {
      // extract socket_fd, epoll_fd, &event, &events_array from 
      //     thread_args
      // epoll_wait() using epoll_fd and events_array received from main
      // now all threads would be using same epoll_fd and events array 
    }

    void main( void ) {
      // create and bind to socket
      // create events_fd
      // allocate memory for events array
      // subscribe to events EPOLLIN and EPOLLET
      // pack the socket_fd, epoll_fd, &events, &events_array into 
      //   thread_args struct.

      // create multiple threads and pass thread_func and 
      //   same thread_args to all threads
    }

还是这样做更好:

    void *thread_func(void *socket_fd) {
      // create events_fd
      // allocate memory for events array
      // subscribe to events EPOLLIN and EPOLLET
      // epoll_wait using own epoll_fd and events_array
      // now all threads would have a separate epoll_fd with 
      //   events populated on its own array
   }

   void main(void) {
     // create and bind to socket

     //create multiple threads and pass thread_func and socket_fd to 
     //  all threads
   }

有没有一个在C语言中做到这一点的好例子?我看到的例子都是在main()函数中运行事件循环,并在检测到事件时生成一个新线程来处理请求。我想要做的是在程序启动时创建特定数量的线程,并使每个线程运行事件循环并处理请求。

1个回答

19
共享同一个Epoll fd在多个线程之间是安全的,但是需要小心处理。推荐使用EPOLLET(边缘触发模式,而不是默认的水平触发模式)来避免其他线程中出现虚假唤醒。如果使用水平触发模式,当新的事件可以处理时,每个线程都会被唤醒。因为只有一个线程会处理它,这会导致大多数线程不必要地被唤醒。每个线程都需要有自己的events数组,否则可能会出现竞争条件和错误。无论是使用共享的Epoll fd还是为每个线程分配一个Epoll fd,都能够很好地工作,但语义不同。下面是一个例子展示如何使用共享的Epoll fd建立一个Echo服务器并且同时监听3000端口,并通过20个线程运行Epoll来接受和服务请求。
#include <stdio.h>
#include <stdlib.h>
#include <inttypes.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <assert.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>

#define SERVERPORT 3000
#define SERVERBACKLOG 10
#define THREADSNO 20
#define EVENTS_BUFF_SZ 256

static int serversock;
static int epoll_fd;
static pthread_t threads[THREADSNO];

int accept_new_client(void) {

    int clientsock;
    struct sockaddr_in addr;
    socklen_t addrlen = sizeof(addr);
    if ((clientsock = accept(serversock, (struct sockaddr *) &addr, &addrlen)) < 0) {
        return -1;
    }

    char ip_buff[INET_ADDRSTRLEN+1];
    if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) {
        close(clientsock);
        return -1;
    }

    printf("*** [%p] Client connected from %s:%" PRIu16 "\n", (void *) pthread_self(),
           ip_buff, ntohs(addr.sin_port));

    struct epoll_event epevent;
    epevent.events = EPOLLIN | EPOLLET;
    epevent.data.fd = clientsock;

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientsock, &epevent) < 0) {
        perror("epoll_ctl(2) failed attempting to add new client");
        close(clientsock);
        return -1;
    }

    return 0;
}

int handle_request(int clientfd) {
    char readbuff[512];
    struct sockaddr_in addr;
    socklen_t addrlen = sizeof(addr);
    ssize_t n;

    if ((n = recv(clientfd, readbuff, sizeof(readbuff)-1, 0)) < 0) {
        return -1;
    }

    if (n == 0) {
        return 0;
    }

    readbuff[n] = '\0';

    if (getpeername(clientfd, (struct sockaddr *) &addr, &addrlen) < 0) {
        return -1;
    }

    char ip_buff[INET_ADDRSTRLEN+1];
    if (inet_ntop(AF_INET, &addr.sin_addr, ip_buff, sizeof(ip_buff)) == NULL) {
        return -1;
    }

    printf("*** [%p] [%s:%" PRIu16 "] -> server: %s", (void *) pthread_self(),
           ip_buff, ntohs(addr.sin_port), readbuff);

    ssize_t sent;
    if ((sent = send(clientfd, readbuff, n, 0)) < 0) {
        return -1;
    }

    readbuff[sent] = '\0';

    printf("*** [%p] server -> [%s:%" PRIu16 "]: %s", (void *) pthread_self(),
           ip_buff, ntohs(addr.sin_port), readbuff);

    return 0;
}

void *worker_thr(void *args) {
    struct epoll_event *events = malloc(sizeof(*events)*EVENTS_BUFF_SZ);
    if (events == NULL) {
        perror("malloc(3) failed when attempting to allocate events buffer");
        pthread_exit(NULL);
    }

    int events_cnt;
    while ((events_cnt = epoll_wait(epoll_fd, events, EVENTS_BUFF_SZ, -1)) > 0) {
        int i;
        for (i = 0; i < events_cnt; i++) {
            assert(events[i].events & EPOLLIN);

            if (events[i].data.fd == serversock) {
                if (accept_new_client() == -1) {
                    fprintf(stderr, "Error accepting new client: %s\n",
                        strerror(errno));
                }
            } else {
                if (handle_request(events[i].data.fd) == -1) {
                    fprintf(stderr, "Error handling request: %s\n",
                        strerror(errno));
                }
            }
        }
    }

    if (events_cnt == 0) {
        fprintf(stderr, "epoll_wait(2) returned 0, but timeout was not specified...?");
    } else {
        perror("epoll_wait(2) error");
    }

    free(events);

    return NULL;
}

int main(void) {
    if ((serversock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
        perror("socket(2) failed");
        exit(EXIT_FAILURE);
    }

    struct sockaddr_in serveraddr;
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_port = htons(SERVERPORT);
    serveraddr.sin_addr.s_addr = INADDR_ANY;

    if (bind(serversock, (const struct sockaddr *) &serveraddr, sizeof(serveraddr)) < 0) {
        perror("bind(2) failed");
        exit(EXIT_FAILURE);
    }

    if (listen(serversock, SERVERBACKLOG) < 0) {
        perror("listen(2) failed");
        exit(EXIT_FAILURE);
    }

    if ((epoll_fd = epoll_create(1)) < 0) {
        perror("epoll_create(2) failed");
        exit(EXIT_FAILURE);
    }

    struct epoll_event epevent;
    epevent.events = EPOLLIN | EPOLLET;
    epevent.data.fd = serversock;

    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, serversock, &epevent) < 0) {
        perror("epoll_ctl(2) failed on main server socket");
        exit(EXIT_FAILURE);
    }

    int i;
    for (i = 0; i < THREADSNO; i++) {
        if (pthread_create(&threads[i], NULL, worker_thr, NULL) < 0) {
            perror("pthread_create(3) failed");
            exit(EXIT_FAILURE);
        }
    }

    /* main thread also contributes as worker thread */
    worker_thr(NULL);

    return 0;
}

几个注意点:

  • main() 应该返回 int 而不是你在示例中展示的 void
  • 总是处理错误返回代码。忽略它们非常常见,但当出现问题时很难知道发生了什么。
  • 代码假设没有请求大于 511 字节(如在 handle_request() 中看到的缓冲区大小)。如果一个请求大于这个值,可能会导致一些数据在套接字中等待很长时间,因为 epoll_wait(2) 只有在文件描述符上发生新事件时才会报告它(因为我们使用了 EPOLLET)。在最坏的情况下,客户端可能永远不会实际发送任何新数据,并且一直等待回复。
  • 打印每个请求的线程标识符的代码假定 pthread_t 是一个不透明指针类型。确实,在 Linux 中,pthread_t 是一个指针类型,但在其他平台上它可能是一个整数类型,所以这不是可移植的。然而,这可能并不是太大的问题,因为 epoll 是 Linux 特有的,所以代码本身就不具备可移植性。
  • 它假设当一个线程仍在为客户端提供服务的请求时,没有来自同一客户端的其他请求。如果在此期间出现新的请求并且另一个线程开始为其提供服务,则存在竞争条件,客户端将不能保证按照发送顺序接收回显消息(但是 write(2) 是原子的,所以回复可能是无序的,但不会交错)。

感谢您提供如此全面的答案,这对我非常有帮助。 - MiJo
@MiJo 很高兴我能帮到你。这是一个很好的问题 :) - Filipe Gonçalves
@Filipe Gonçalves 我有一个问题:在多线程中共享 epoll fd 时,它是如何处理负载均衡的? - sundq

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接