在C语言中使用工作线程的epoll IO

11

我正在编写一个小型服务器,它将从多个源接收数据并处理这些数据。这些源和接收到的数据都很重要,但 epoll 应该能够很好地处理。然而,所有接收到的数据都必须被解析并通过大量测试,这是耗时的,并且尽管使用 epoll 多路复用,仍会阻塞单个线程。基本上,模式应该像以下内容:IO 循环接收数据并将其打包成作业,发送给池中可用的第一个线程,该捆绑包由作业处理,结果传递回 IO 循环以写入文件。

我已决定使用一个 IO 线程和 N 个工作线程。使用提供的示例实现接受 tcp 连接和读取数据的 IO 线程非常容易,参见: http://linux.die.net/man/7/epoll

线程通常也很容易处理,但我正在努力以优雅的方式将 epoll IO 循环与线程池相结合。我无法在网上找到任何关于使用带有 worker 池的 epoll 的“最佳实践”,但有很多关于同一主题的问题。

因此,我有一些问题,希望有人能帮我回答:

  1. 可以(并且应该)使用 eventfd 作为 IO 线程和所有工作线程之间的双向同步机制吗?例如,每个 worker 线程是否应具有自己的 epoll 例程,等待共享事件 (带有包含数据/关于作业信息的结构体指针) ,即以某种方式使用 eventfd 作为作业队列?还可以从多个 worker 线程传递另一个 eventfd 将结果传回 IO 线程吗?
  2. 在套接字上通知 IO 线程有更多数据后,接收实际数据是应该在 IO 线程上进行,还是应该由工作者线程自行接收数据以避免阻塞 IO 线程,同时解析数据帧等?在这种情况下,如何确保安全性,例如如果 recv 在一个 worker 线程中读取了 1.5 帧数据,而另一个 worker 线程从同一连接接收了最后的 0.5 帧数据。
  • 如果通过互斥锁等方式实现工作线程池,当N+1个线程尝试使用相同的锁时,等待锁是否会阻塞IO线程?
  • 在epoll中如何构建具有双向通信(即从IO到工作者以及返回)的工作线程池的最佳实践模式?
  • 编辑:一种可能的解决方案是从IO循环更新环形缓冲区,在更新后将环形缓冲区索引通过共享管道发送给所有工作线程(因此将该索引的控制权交给第一个读取管道中索引的工作线程),让工作线程拥有该索引直到处理结束,然后再次通过管道将索引号发送回IO线程,从而归还控制权。

    我的应用程序仅限于Linux,因此我可以使用Linux特有的功能以最优雅的方式实现。不需要跨平台支持,但需要考虑性能和线程安全性。


    我认为我可能有一个有用的解决方案,但首先需要知道,您何时能了解单个帧/数据包的长度?它们是固定长度的吗?是否包含在数据包头中?还是只有在结束时才能确定长度?如果您早些时候就知道,就可以更轻松地将工作传递给其他线程,而不会繁忙地使用主线程,但如果您直到最后才知道,主线程不可避免地必须进行大量读取。 - Vality
    嗨,我知道在接收和迭代接收缓冲区之后的长度。不幸的是,它们不是固定长度,并且长度不出现在数据包中,但基于换行符帧。 - agnsaft
    3个回答

    7

    在我的测试中,每个线程使用一个epoll实例远远优于复杂的线程模型。如果将监听套接字添加到所有epoll实例中,则工作进程只需简单地调用 accept(2) ,并将连接授予获胜者以及处理其生命周期。

    你的工作进程可能看起来像这样:

    for (;;) {
        nfds = epoll_wait(worker->efd, &evs, 1024, -1);
    
        for (i = 0; i < nfds; i++)
            ((struct socket_context*)evs[i].data.ptr)->handler(
                evs[i].data.ptr,
                evs[i].events);
    }
    

    每个添加到 epoll 实例的文件描述符都可以与一个 struct socket_context 关联:

    void listener_handler(struct socket_context* ctx, int ev)
    {
        struct socket_context* conn;
    
        conn->fd = accept(ctx->fd, NULL, NULL);
        conn->handler = conn_handler;
    
        /* add to calling worker's epoll instance or implement some form
         * of load balancing */
    }
    
    void conn_handler(struct socket_context* ctx, int ev)
    {
        /* read all available data and process. if incomplete, stash
         * data in ctx and continue next time handler is called */
    }
    
    void dummy_handler(struct socket_context* ctx, int ev)
    {
        /* handle exit condition async by adding a pipe with its
         * own handler */
    }
    

    我喜欢这个策略,因为:

    • 设计非常简单;
    • 所有线程都是相同的;
    • 工作线程和连接是隔离的 - 不会互相干扰或在错误的工作线程中调用 read(2)
    • 不需要锁定(内核会担心 accept(2) 的同步);
    • 在没有繁忙的工作进程主动竞争 accept(2) 的情况下,有点自然地负载平衡。

    关于 epoll 的一些注意事项:

    • 使用边缘触发模式、非阻塞套接字并始终读取直到 EAGAIN
    • 避免使用 dup(2) 系列调用,以免遇到一些意外情况(epoll 注册文件描述符,但实际上监视文件描述);
    • 您可以安全地 epoll_ctl(2) 其他线程的 epoll 实例;
    • 使用大型 struct epoll_event 缓冲区进行 epoll_wait(2),以避免饥饿。

    其他注意事项:

    • 使用 accept4(2) 以节省系统调用;
    • 每个核心使用一个线程(如果受限于 CPU,则为每个物理线程;如果受限于 I/O,则为每个逻辑线程);
    • 如果连接数较低,则使用 poll(2)/select(2) 可能更快。

    希望这会有所帮助。


    1
    我喜欢这个想法,但是我担心每次接收后我的繁重工作负载会阻塞其他连接。此外,如果一个线程“幸运”地首先选择下一个接受,那么这可能会导致每个线程的工作负载不平衡。此外,如果我只有4-5个连接,我可能仍然需要30个工作线程来处理它们产生的内容。 - agnsaft
    @invictus 是的,除非你在监听器处理程序中自己平均分配连接,否则工作量不会完全平衡,这可能会增加一些复杂性。你的工作是 CPU 绑定还是 I/O 绑定?如果是 CPU 绑定,比处理器核心更多的线程只会引入更多的上下文切换。 - haste
    @pindumb 对于低线程计数(每个物理核心一个线程)不会有问题。在负载下,只有一小部分线程会见到可读的监听器。如果这是一个问题,监听器可以在线程之间轮换。但如果有数百或数千个线程,情况就会有所不同。 - haste
    我认为,虽然这个想法很有趣,但这种设计并不适合,因为我认为单个线程应该能够处理所有的IO,而实际处理过程则高度依赖于CPU。我将在14小时内添加悬赏,因为我已经寻找了多年一个好的答案,但我所找到的只是其他人也在疑惑同样的问题。希望能得到一个“教科书式的答案”,方便其他人进行搜索。 :) - agnsaft
    @invictus 酷毙了。我一直在做类似的I/O绑定工作。我尝试过1个监听线程+N个工作线程,使用单个和独立的无锁FIFO工作队列。性能下降的原因是缓存行反弹以及找出何时/如何使工作线程进入睡眠和唤醒状态。隔离确实是提高10-100倍速度的关键。然而,如果处理时间很长,则共享可写数据可能可以忽略不计。祝你好运。 :) - haste

    5
    执行此模型时,因为我们只有在完全接收数据包后才知道数据包的大小,所以不幸的是,我们无法将接收本身转移到工作线程。相反,我们仍然能做到的最好的方法是创建一个接收数据的线程,该线程必须传递指向完全接收数据包的指针。
    数据本身最好保存在循环缓冲区中,但是我们需要为每个输入源准备单独的缓冲区(如果我们获得部分数据包,则可以继续从其他源接收数据而不拆分数据)。剩下的问题是如何通知工作线程新数据包已经准备好,并给它们指向该数据包中数据的指针。由于这里的数据很少,只有一些指针,最优雅的方法是使用posix消息队列。这些队列提供了多个发送者和多个接收者编写和读取消息的能力,始终确保每个消息都被精确地1个线程接收。
    您将需要为每个数据源准备类似于以下结构体的结构体,我将介绍字段的用途。
    struct DataSource
    {
        int SourceFD;
        char DataBuffer[MAX_PACKET_SIZE * (THREAD_COUNT + 1)];
        char *LatestPacket;
        char *CurrentLocation
        int SizeLeft;
    };
    

    SourceFD是指相关数据流的文件描述符,DataBuffer用于保存在处理过程中的Packet内容,它是一个循环缓冲区。LatestPacket指针用于临时存储对最新数据包的指针,以防我们接收到部分数据包并在传递数据包之前移动到另一个源。CurrentLocation存储最新数据包结束的位置,以便我们知道下一个数据包放置的位置,或者在部分接收的情况下继续进行。SizeLeft表示缓冲区剩余的空间,这将用于确定是否可以容纳数据包,或者需要重新回到开始处。
    接收函数将有效地:
    • 将数据包的内容复制到缓冲区
    • 将CurrentLocation移动到数据包末尾
    • 更新SizeLeft以减少缓冲区大小
    • 如果我们无法将数据包放入缓冲区末尾,我们会循环回到开头
    • 如果那里也没有足够的空间,则稍后重试,同时转到另一个源
    • 如果有部分接收,则将LatestPacket指针存储为指向数据包的起始位置,并转到另一个流,直到获取其余部分
    • 使用posix消息队列向工作线程发送消息,以便它可以处理数据,消息将包含指向DataSource结构的指针,以便它可以使用它,并且还需要指向正在处理的数据包及其大小的指针,这些可以在接收到数据包时计算
    工作线程将使用接收到的指针进行处理,然后增加SizeLeft,以便接收线程知道它可以继续填充缓冲区。原子函数将需要用于在结构体中处理大小值,以便我们不会在大小属性上出现竞争条件(因为它可能被工作者和IO线程同时写入,导致丢失的写入,请参见下面我的评论),它们在这里列出,并且简单而极其有用。
    现在,我已经给出了一些一般背景,但会具体解决特定的问题:
    1. 使用EventFD作为同步机制通常是一个不好的主意,你会发现自己使用了大量不必要的CPU时间,并且很难进行任何同步。特别是如果您有多个线程选择相同的文件描述符,可能会遇到严重问题。这实际上是一种恶劣的hack,有时可以工作,但无法真正替代适当的同步。
    2. 尝试像上面解释的那样卸载接收也是一个坏主意,您可以通过复杂的IPC解决此问题,但老实说,接收IO需要足够的时间才能使应用程序停顿,您的IO也很可能比CPU慢,因此使用多个线程接收几乎没有什么帮助。(这假设您没有说,拥有几个10吉比特的网络卡)。
    3. 在这里使用互斥锁或锁定是一个愚蠢的想法,它更适合于无锁编码,因为共享数据的数量很少(同时),您只是在交付工作和数据。这也将提高接收线程的性能并使您的应用程序更具可扩展性。使用此处提到的函数http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html,您可以轻松完成此操作。如果按照这种方式执行,则需要一个信号量,每次接收到数据包时都可以解锁它,并由启动作业的每个线程锁定,以允许在准备好更多数据包时动态添加更多线程,这将比使用互斥锁的自制解决方案具有更少的开销。
    4. 这里与任何线程池没有太大区别,您会生成大量线程,然后使它们全部在mq_receive上阻塞等待消息数据队列。当它们完成时,它们将其结果发送回主线程,主线程将结果消息队列添加到其epoll列表中。然后可以以此方式接收结果,对于像指针这样的小数据有效且非常高效。这也将使用很少的CPU,并且不会强制主线程浪费时间来管理工作线程。

    最后,您的编辑相当明智,除了我建议的事实之外,消息队列在这里比管道要好得多,因为它们非常有效地信号事件,保证完整的消息读取并提供自动帧。

    希望这可以帮助您,但是现在很晚,如果我漏掉了什么或者您有问题,请随时评论以进行澄清或更多解释。


    谢谢您长篇回答。我有几个问题:我可以假设多个线程可能会在同一队列上阻塞,以等待新任务吗?同样,多个线程是否可以写入另一个队列以传递完成的工作?在这种设计中,我真的需要如上所述的内置功能吗? - agnsaft
    @invictus 消息队列确实是多对多的关系,它们非常强大,因为任意数量的发送者和接收者都可以使用队列,并且消息将始终传递给一个监听线程。对于绝大多数代码来说,上述内置函数并不需要,唯一的用途是确保 SizeLeft 被原子更新,以确保接收线程和工作线程不会同时更新它并导致其损坏,例如:线程1加载值,线程2加载值,线程1写入它,线程2写入它,线程1的写入被覆盖。 - Vality
    另外,我忘了提到一点,posix消息队列保证在发送时永远不会阻塞发送方,数据传输全部异步完成。这意味着没有任何开销会减慢IO线程的速度。 - Vality
    我明白了。我认为对于我来说,共享内存队列系统是不可行的,因为我希望将结果反馈到主IO循环中,并且需要一种等待来自工作线程和IO FD的事件的方法。对我来说,你的建议仍然是最好的选择 :) - agnsaft
    @invictus 很不幸,这种管道不是线程安全的,您无法保证框架方面的任何保证。消息队列可以确保1个完整的消息到达一个线程,而如果消息超过1个字节,则可以以未定义的方式交错地进行,并且多个线程可能会每个读取数据包的一部分,从而在您尝试将大小作为指针引用时可能会立即导致segfault。消息队列实际上非常便携,几乎支持每个Posix系统。 - Vality
    显示剩余3条评论

    0

    我在其他帖子中发布了相同的答案:我想等待文件描述符和互斥锁,有什么推荐的方法?

    ==========================================================

    这是一个非常常见的问题,特别是当你正在开发网络服务器端程序时。大多数Linux服务器端程序的主循环看起来会像这样:

    epoll_add(serv_sock);
    while(1){
        ret = epoll_wait();
        foreach(ret as fd){
            req = fd.read();
            resp = proc(req);
            fd.send(resp);
        }
    }
    

    这是一个基于 epoll 的单线程(主线程)服务器框架。问题在于它是单线程的,而不是多线程的。它要求 proc() 不应该阻塞或运行很长时间(比如对于常见情况来说不超过 10 毫秒)。

    如果 proc() 运行时间很长,我们需要多线程,并在一个独立的线程(工作线程)中执行 proc()。

    我们可以使用基于互斥锁的消息队列将任务提交到工作线程,而不会阻塞主线程,速度足够快。

    然后我们需要一种方法从工作线程获取任务结果。怎么做呢?如果我们直接在 epoll_wait() 前后检查消息队列,但是检查操作将在 epoll_wait() 结束后执行,而如果所有等待的文件描述符都没有活动,epoll_wait() 通常会阻塞 10 微秒(常见情况)。

    对于服务器来说,10 毫秒已经相当长了!那么我们能否在任务结果生成时立即结束 epoll_wait() 呢?

    可以!我将在我的一个开源项目中描述如何实现。

    创建一个管道来供所有工作线程使用,而epoll也会在该管道上等待。一旦生成任务结果,工作线程就会向管道中写入一个字节,然后epoll_wait()几乎同时结束!- Linux管道的延迟为5微秒至20微秒。

    在我的项目SSDB(一种Redis协议兼容的磁盘内NoSQL数据库)中,我创建了一个SelectableQueue来在主线程和工作线程之间传递消息。正如其名称所示,SelectableQueue具有文件描述符,可以由epoll等待。

    SelectableQueue:https://github.com/ideawu/ssdb/blob/master/src/util/thread.h#L94

    主线程中的用法:

    epoll_add(serv_sock);
    epoll_add(queue->fd());
    while(1){
        ret = epoll_wait();
        foreach(ret as fd){
            if(fd is worker_thread){
                sock, resp = worker->pop_result();
                sock.send(resp);
            }
            if(fd is client_socket){
                req = fd.read();
                worker->add_task(fd, req);
            }
        }
    }
    

    在工作线程中的用法:

    fd, req = queue->pop_task();
    resp = proc(req);
    queue->add_result(fd, resp);
    

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接