我想同时等待文件描述符和互斥锁,有什么推荐的方法吗?

23
我想要创建线程来执行特定任务,并使用线程安全的队列与它们进行通信。同时,我希望在等待时对各种文件描述符进行IO操作。
如何推荐实现这一目标?我需要创建线程间管道并在队列从无元素变为有元素时进行写入吗?难道没有更好的方法吗?
如果我必须创建线程间管道,为什么不是更多实现共享队列的库允许您将共享队列和线程间管道作为单个实体创建呢?
我要求两种语言下的解答:C++和Python。我稍微关心跨平台解决方案,但主要关注Linux。
为了更具体说明问题...
我有一些代码将在文件系统树中搜索内容。我通过套接字打开了几个与外界通信的渠道。可能会导致需要在文件系统树中搜索内容的请求将到达。
我将把搜索文件系统树的代码隔离在一个或多个线程中。我想将需要搜索树的请求放入一个线程安全的队列中,由搜索线程执行。结果将放入一个已完成搜索的队列中。
我想能够快速处理所有非搜索请求,同时进行搜索。我希望能够及时处理搜索结果。
处理传入的请求通常意味着使用epoll的事件驱动架构。磁盘搜索请求队列和返回结果队列则使用互斥锁或信号量实现线程安全。
等待空队列的标准方法是使用条件变量。但如果我需要在等待时处理其他请求,那么这种方式就行不通了。要么我一直轮询结果队列(平均延迟结果的一半),要么阻塞而不服务请求。

你说线程,但接着又谈到共享内存和管道。你是想要分叉进程吗? - Brian Roach
@Brian Roach - 抱歉,那有点混淆。我指的是线程。我将“共享内存队列”更改为“线程安全队列”。 - Omnifarious
队列没问题,你只需要不用担心“共享内存”……它们确实共享同一块内存——这是一个单一的进程。“管道”通常是指IPC管道。实际上,你只需要阅读如何使用锁在线程之间共享数据/对象的相关知识。 - Brian Roach
@Brian Roach - 我知道如何做。问题是我不想在等待锁时阻塞,因为我还会有文件描述符/套接字的请求进来。 - Omnifarious
8个回答

13
使用事件驱动架构时,需要有一个单一的机制来报告事件完成。在Linux上,如果使用文件,则需要使用来自select或poll系列的内容,这意味着您必须使用管道来启动所有与文件无关的事件。
编辑:Linux有eventfdtimerfd。这些可以添加到您的epoll列表中,并在另一个线程触发或定时器事件时用于退出epoll_wait
还有另一种选择,那就是信号。可以使用fcntl修改文件描述符,以便在文件描述符变为活动状态时发出信号。然后,信号处理程序可以将文件就绪消息推送到您选择的任何类型的队列中。这可以是简单的信号量或互斥体/条件变量驱动的队列。由于现在不再使用select/poll,因此不再需要使用管道来排队无文件基础的消息。
健康警告:我尚未尝试过这种方法,虽然我看不出为什么它不起作用,但我确实不知道signal方法的性能影响。
编辑:在信号处理程序中操作互斥体可能是一个非常糟糕的想法。

这也是一个有趣的想法。谢谢。 - Omnifarious
有一个需要注意的问题。如果你没有特别小心确保正确的信号没有在正确的线程中被阻塞,可能会出现死锁的情况,在将某物压入队列中时。例如,在正在从队列中取出某物的线程中收到放置某物的信号可能会非常不愉快。 - Omnifarious
是的,您将需要在向队列添加内容时阻止信号(这可能涉及系统调用)。 - doron
我选择你的答案,因为它是最有趣和不寻常的替代方法。不过我可能还是会坚持使用管道符号。 :-) - Omnifarious
1
好吧,它确实附带了健康警告 ;) - doron
显示剩余4条评论

5

我曾使用你提到的pipe()和libevent(它包装了epoll)来解决这个问题。工作线程在其输出队列从空变为非空时向其管道FD写入一个字节。这会唤醒主IO线程,然后可以获取工作线程的输出。这种方法非常简单易懂,而且效果很好。


谢谢你成为第一个理解我的问题的人!我本来希望避免使用管道,但看起来这是唯一的方法。虽然有点笨拙,但也只能如此了。 - Omnifarious
@Omnifarious - 我理解了你的问题。也许我没有表达清楚。使用 MQ,您不需要额外的管道。您将工作程序输出队列的 mqd_t 放入选择集中。当其中一个工作程序将某些内容放入输出队列时,您的主循环将收到通知。 - Duck
请确保将文件描述符设置为非阻塞模式。MQ 看起来很不错 - 我很想知道将其连接到 libevent 是否会有任何问题。 - twk
我从未使用过libevent,但如果它只是封装了epoll,那么它应该可以工作。《Man (7) mq_overview》明确指出:“在Linux上,消息队列描述符实际上是文件描述符,并且可以使用select(2),poll(2)或epoll(7)进行监视。这不是可移植的。” - Duck
我很想选择你的方法,因为那可能是我真正要做的。它的优点在于仅需要在特定情况下进行系统调用(即在非空队列变为空后)。而且这些情况在高负载时发生的可能性较小(而不是更大)。但是,我选择了最不寻常和有趣的替代方法。 - Omnifarious

4
您有Linux标签,因此我会提供以下内容:POSIX消息队列可以完成所有这些操作,如果不是您所需的跨平台请求,则应该满足您的“内置”要求。
线程安全同步是内置的。您可以让工作线程在读取队列时阻塞。或者,MQ可以使用mq_notify()在队列中放入新项时生成一个新线程(或信号现有线程)。由于看起来您将使用select(),因此MQ的标识符(mqd_t)可用作带有select的文件描述符。

其实,仔细想想,这个方法可能行得通。我原本认为必须将所有数据序列化,但是我可以直接将一个原始指针放入队列中。虽然这会让引用计数内存管理变得相当有趣。:-) 当然,我也可以将原始指针直接放入管道中。 - Omnifarious
如果您在同一进程中,可以传递原始指针,前提是它表示所有权转移给了新线程(旧线程不再允许访问数据)。如果您跨进程工作,则序列化变得很重要。请记住,与管道一样,消息队列是完整的内核跨进程内核对象,因此,与其他ITC机制不同,任何访问它们都将涉及系统调用。 - doron
@doron 我认为 Omnifarious 进行了大量的文件和网络 I/O,因此系统调用将是无关紧要的。当然,我也认为他只返回文件名或类似的东西,而不是复杂的结构或变量数组。无论是 MQ 还是手工构建,你几乎都被迫传递指针(在进程内)。 - Duck
@Duck:我正在返回一个复杂的结构。但是我可以只返回文件名,让主线程读取复杂的结构。不过我认为,这将显著降低在另一个线程中使用文件系统IO的效用。我正在将文件系统用作键值存储。最终,我可能还会使用DHT和/或NoSQL数据库。并且fs访问线程可能需要传回“我不知道,从你正在交谈的人那里获取它”。 - Omnifarious
你也有一个有趣的方法。将队列转换为文件描述符,这样你就不会有两种根本不同的对象了。我并不认为 POSIX 消息队列有多好。所有的 POSIX IPC 机制都存在一个问题,即创建没有文件系统名称的持久资源,并需要特殊工具(比如 ipcs 命令)来管理。消息队列唯一真正提供的优势是持久性和消息边界的保留,超过管道所能提供的。 - Omnifarious
显示剩余2条评论

4

似乎还没有人提到这个选项:

不要在“主线程”中运行select/poll/等操作。启动一个专用的辅助线程来执行I/O操作,并在I/O操作完成时将通知推送到您的线程安全队列(与其他线程用于与主线程通信的相同队列)。然后,您的主线程只需要等待通知队列即可。


3
在我看来,鸭子和twk的回答比OP选择的doron更好。 doron建议从信号处理程序的上下文中写入消息队列,并声明消息队列可以是“任何类型的队列”。我强烈警告您不要这样做,因为许多C库/系统调用不能安全地从信号处理程序中调用(请参见async-signal-safe)。
特别是,如果您选择一个由互斥锁保护的队列,则不应从信号处理程序中访问它。考虑以下情况:您的消费者线程锁定队列以读取它。立即之后,内核传递信号通知您现在有数据的文件描述符。您的信号处理程序在消费者线程中运行(必要时),并尝试将某些内容放入您的队列中。为此,它首先必须获取锁定。但它已经持有锁定,所以现在您已经死锁了。
在我的经验中,选择/select和轮询/poll是UNIX/Linux事件驱动程序中唯一可行的解决方案。我希望在多线程程序内有更好的方式,但您需要某种机制来“唤醒”消费者线程。我还没有找到不涉及系统调用的方法(因为在任何阻塞调用(例如select)期间,消费者线程都在内核中的等待队列上)。
编辑:我忘记提到使用select/poll处理信号的一个Linux特定方法:signalfd(2)。您可以得到一个文件描述符,可以在其上进行选择/轮询,并且处理代码通常运行,而不是在信号处理程序的上下文中。

我在注释中提到了这一点。在获取互斥锁之前,您必须阻止相关信号。也许我应该选择@twk的答案,因为那是我实际使用的。不过,学习signalfd也很好。 :-) - Omnifarious
哇!signalfd() 真是太棒了!我一直想知道为什么在 Unix 下 signal() 接口还是那么糟糕,但有了 signalfd(),一切都变得轻而易举。 - Alexis Wilke

2
这是一个非常常见的问题,特别是当您正在开发网络服务器端程序时。大多数Linux服务器端程序的主要外观循环将如下所示:
epoll_add(serv_sock);
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        req = fd.read();
        resp = proc(req);
        fd.send(resp);
    }
}

这是一个基于 epoll 的单线程服务器框架(主线程)。问题在于它是单线程的,不支持多线程。因此,要求 proc() 永远不会阻塞或运行太长时间(例如对于常见情况不超过 10 毫秒)。
如果 proc() 将会运行很长时间,我们需要多线程,并在单独的线程中执行 proc()。
我们可以使用基于互斥锁的消息队列将任务提交给工作线程,而不会阻塞主线程,速度足够快。
epoll_add(serv_sock);
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        req = fd.read();
        queue.add_job(req); // fast, non blockable
    }
}

那么我们需要一种方法从工作线程中获取任务结果。怎么做呢?如果我们直接在epoll_wait()之前或之后检查消息队列会怎样?
epoll_add(serv_sock);
while(1){
    ret = epoll_wait(); // may blocks for 10ms
    resp = queue.check_result(); // fast, non blockable
    foreach(ret as fd){
        req = fd.read();
        queue.add_job(req); // fast, non blockable
    }
}

然而,检查操作将在 epoll_wait() 结束后执行,如果 epoll_wait() 等待的所有文件描述符都未活动,通常会阻塞 10 微秒(常见情况)。
对于服务器而言,10 毫秒是相当长的时间!当任务结果生成时,我们能否向 epoll_wait() 发送信号以立即结束呢?
可以!我将介绍如何在我的一个开源项目中完成此操作:
为所有工作线程创建一个管道,并使 epoll 也等待该管道。一旦生成了任务结果,工作线程就会向管道写入一个字节,然后 epoll_wait() 几乎会在同一时间结束!- Linux 管道具有 5 至 20 微秒的延迟。
在我的项目SSDB(一种与Redis协议兼容的磁盘内NoSQL数据库)中,我创建了一个SelectableQueue,用于在主线程和工作线程之间传递消息。正如其名称所示,SelectableQueue具有文件描述符,可以通过epoll等待。
SelectableQueue: https://github.com/ideawu/ssdb/blob/master/src/util/thread.h#L94 主线程中的使用方法:
epoll_add(serv_sock);
epoll_add(queue->fd());
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        if(fd is queue){
            sock, resp = queue->pop_result();
            sock.send(resp);
        }
        if(fd is client_socket){
            req = fd.read();
            queue->add_task(fd, req);
        }
    }
}

在工作线程中的用法:
fd, req = queue->pop_task();
resp = proc(req);
queue->add_result(fd, resp);

是的。:-( 我本来希望避免这种情况,但似乎这是必须要做的事情。我和某个人交谈时得知,曾经有一个版本的 futex 调用接受文件描述符,并且 epoll 可以等待该文件描述符,直到 futex 被解除阻塞。如果像 signalfd 系统调用一样添加了这个功能就好了。 - Omnifarious
@Omnifarious 看起来曾经有一个 FUTEX_FD 选项用于 futex 系统调用,允许将 fd 与 futex 关联起来,从而通过 epoll_wait 等方式方便地等待多个 futex,但它已被淘汰,因为正如 futex 手册所说,它是“固有的竞争条件”。 - Petr Skocik
@PSkocik - 我相信 Valve 正在尝试对 futex 调用进行更改,以支持类似于 Proton(也称为游戏的 WINE)的东西。许多 Windows 程序依赖于大量事件,并使用其自己的 fd 对每个事件进行建模会耗尽进程中的 fds。因此,他们需要一种更好的处理方式,仍然可以轻松地同时等待多个事件。 - Omnifarious
@Omnifarious 我相当确定我将使用基于eventfd/socketpair/pipe的信号量/条件变量,因为它们可以与其他文件描述符一起进行轮询。信号量和条件变量本来就是用于长时间等待的,而对于这种情况,futexes似乎过于复杂,只是为了在最后一刻避免睡眠和系统调用而付出了失去轮询能力的代价。如果避免睡眠如此重要,最好不要像这样微观优化,而是简单地自旋等待一段时间。 - Petr Skocik

1
C++11有std::mutex和std::condition_variable。这两个可以用来在满足特定条件时,一个线程通知另一个线程。听起来你需要使用这些基本元素来构建你的解决方案。如果你的环境还不支持这些C++11库特性,你可以在boost中找到非常相似的功能。对于Python我就不太清楚了,抱歉。

0

实现您想要的功能的一种方法是通过实现观察者模式

您可以将主线程注册为所有生成的线程的观察者,并在它们完成它们应该做的事情(或在运行期间使用您需要的信息进行更新)时通知它。

基本上,您想要将您的方法更改为事件驱动模型。


这样做会导致观察者通知在执行工作的线程上下文中运行,对吗?这是否意味着通知访问的任何对象也必须是线程安全的,以免主线程和工作线程相互干扰? - Omnifarious
主线程中被子线程调用的方法需要有一个锁(由主线程拥有),但你只需要在该方法中执行所需操作的时间内进行锁定。 - Brian Roach
这就归结于线程同步中没有免费的午餐 :) 当可能时,您可以使用非阻塞调用/锁检查尽可能地减少争用...但最终,如果您需要在多个线程中修改某些内容,则需要以某种方式保护它。 - Brian Roach

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接