使用管道、tee()和splice()向多个套接字发送数据

13
我正在使用tee()来复制一个“主”管道,以便使用splice()写入多个套接字。由于我可以将数据通过splice()函数传输到目标套接字的速度不同,因此这些管道将以不同的速率被清空。因此,当我下一次尝试向“主”管道添加数据并再次使用tee()函数时,可能会出现这样一种情况:我可以将64KB的数据写入管道,但只能将4KB的数据复制到其中一个“从”管道中。我猜想,如果我将整个“主”管道都通过splice()函数传输到套接字,那么我将永远无法将剩余的60KB复制到该“从”管道中。是这样吗?我认为我可以跟踪一个tee_offset(从0开始),将其设置为“未复制”的数据的开头,然后不要在splice()函数中超过它。所以在这种情况下,我会将tee_offset设置为4096,并且在能够将所有数据复制到其他管道之前,不会超过这个范围。我偏离了正确的轨道吗?您有什么提示/警告给我吗?
1个回答

26
如果我理解正确,您有一些实时数据源,想要将其复用到多个套接字。您已经连接了一个“源”管道到任何产生数据的地方,并且对于每个您希望发送数据的套接字都有一个“目标”管道。您正在使用tee()从源管道复制数据到每个目标管道,并使用splice()将其从目标管道复制到套接字本身。
您将遇到的根本问题是,如果其中一个套接字无法跟上 - 如果您产生的数据比您发送的速度更快,那么您将会遇到问题。这与您使用管道无关,这只是一个根本性的问题。因此,您需要选择一种策略来应对这种情况 - 即使您不希望它很常见,我建议您处理这个问题,因为这些问题通常会在以后困扰您。您的基本选择是关闭有问题的套接字,或者跳过数据直到它清除其输出缓冲区 - 后者可能更适合音频/视频流等情况。

然而,与你使用管道相关的问题是,在Linux上管道缓冲区的大小有些不太灵活。从Linux 2.6.11开始,默认为64K(自Linux 2.6.17中添加了tee()调用)-请参见pipe manpage。自2.6.35以来,可以通过将F_SETPIPE_SZ选项传递给fcntl()(请参见fcntl manpage),将此值更改到由/proc/sys/fs/pipe-size-max指定的限制,但缓冲仍比在用户空间中动态分配的方案更难以按需更改。这意味着你应对慢速套接字的能力会受到一定限制,能否接受这种情况取决于你期望接收和发送数据的速率。

假设这个缓冲策略是可以接受的,你在正确地假设需要跟踪每个目标管道从源头消耗了多少数据,只有所有目标管道都消耗了数据,才能安全地丢弃数据。这有点复杂,因为tee()没有偏移量的概念——你只能从管道的开头复制。这意味着你只能以最慢的套接字的速度进行复制,因为在一些数据被从源头消耗之前,你不能使用tee()将数据复制到目标管道中,而在所有套接字都拥有即将消耗的数据之前,你也不能这样做。
如何处理这个问题取决于你的数据的重要性。如果你真的需要tee()和splice()的速度,并且你相信慢速套接字是极为罕见的事件,你可以像这样做(我假设你正在使用非阻塞IO和单个线程,但类似的方法也适用于多个线程):
  1. 确保所有管道都是非阻塞的(使用fcntl(d, F_SETFL, O_NONBLOCK)使每个文件描述符都变为非阻塞)。
  2. 为每个目标管道初始化一个read_counter变量,初始值为零。
  3. 使用类似epoll()的方法等待源管道中有数据。
  4. 循环遍历所有read_counter为零的目标管道,调用tee()函数将数据传输到每个管道。确保在flags参数中传递SPLICE_F_NONBLOCK
  5. 通过tee()传输的数据量增加到每个目标管道的read_counter变量中,并跟踪最小值。
  6. 找到read_counter的最小值 - 如果这个值不为零,则从源管道中丢弃相应数量的数据(例如使用一个打开在/dev/null上的目标进行splice()调用)。在丢弃数据之后,从所有管道的read_counter中减去已丢弃的数量(因为这是最小值,所以这不会导致任何一个管道的计数器变为负数)。
  7. 返回第3步。
注意:过去让我犯了困扰的一件事是,SPLICE_F_NONBLOCK会影响管道上的tee()splice()操作是否为非阻塞方式,而您使用fnctl()设置的O_NONBLOCK则会影响与其他调用(例如read()write())的交互是否为非阻塞方式。如果您希望所有内容都是非阻塞的,请同时设置两者。还要记得将您的套接字设置为非阻塞方式,否则splice()调用传输数据到它们时可能会发生阻塞(除非这正是您想要的,如果您正在使用线程方法)。
如您所见,这种策略存在一个主要问题 - 一旦一个套接字被阻塞,所有内容都会停止 - 该套接字的目标管道将填满,然后源管道将变得停滞不前。因此,如果在第4步中tee()返回EAGAIN,则您需要关闭该套接字,或者至少“断开”它(即将其从循环中取出),直到其输出缓冲区为空为止,以便您不再向其写入任何其他内容。选择哪个取决于数据流是否能够从跳过其中一部分中恢复过来。
如果您想更优雅地处理网络延迟,那么您需要做更多的缓冲,这将涉及用户空间缓冲区(这相当于抵消了tee()splice()的优势)或可能是基于磁盘的缓冲。基于磁盘的缓冲几乎肯定比用户空间缓冲慢得多,因此不适合,因为您首先选择了tee()splice(),可以推断您想要更快的速度,但我提到它是为了完整性。
如果您最终需要从用户空间插入数据,则值得注意的一点是vmsplice()调用,它可以将来自用户空间的“收集输出”传输到管道中,类似于writev()调用。如果您正在进行足够的缓冲以将数据分配在多个不同的分配缓冲区中(例如,如果您使用池分配器方法),那么这可能非常有用。
最后,您可以想象在“快速”方案(使用tee()splice())之间交换套接字,如果它们无法跟上,则将它们移动到较慢的用户空间缓冲区。这会使您的实现变得复杂,但如果您处理大量连接,并且其中只有很少一部分是缓慢的,则仍然可以减少涉及到用户空间的复制量。但是,这只能是应对瞬时网络问题的短期措施 - 正如我最初所说的,如果您的套接字比源慢,则存在根本问题。您最终会达到某些缓冲限制并需要跳过数据或关闭连接。
总的来说,你需要仔细考虑为什么需要tee()splice()的速度,以及对于你的用例来说,是否简单地在内存或磁盘上使用用户空间缓冲更合适。然而,如果你确信速度始终很快,并且有限的缓冲是可以接受的,那么我上面概述的方法应该可以解决问题。
此外,我要提到的一件事是,这将使你的代码极度依赖于Linux - 我不知道其他Unix变体是否支持这些调用。 sendfile()调用比splice()更受限制,但可能更具可移植性。如果你真的希望代码具有可移植性,请坚持使用用户空间缓冲。
如果有任何我涉及的内容需要更详细的解释,请告诉我。

5
我希望我能给你的回答加10分。是的,你很好地描述了我的问题,并且如果一个套接字无法跟上,你对问题的看法也是正确的。每个套接字应该以相同的速度传输,除非一个接收器出现问题,这时唯一明智的做法是将其从复制集中删除。但是你错过的一点是,虽然管道默认情况下是64KB,但是你可以使用fcntl将它们增加到1MB(这个限制本身也可以通过修改/proc/sys/fs/pipe-max-size来提高)。我有足够的内存,可以为每个管道分配多达64MB的空间。你觉得呢? - Eloff
我之前从未了解过 F_SETPIPE_SZ,谢谢!我已经编辑了我的回答。请记住,2.6.35 仍然有点新(例如,Ubuntu 10.04 LTS 是 2.6.32 AFAIK)。只要您不介意针对 Linux,这种方法似乎是没问题的。我建议尽可能限制代码范围,以免出现针对 Linux 特定的情况。还要记住的是,这只是解决方案的一个方面——如果性能很重要,建议尝试使用非阻塞 IO vs. 线程 vs. 进程来看哪个最适合您。管道的好处之一是,如果需要,它们在 fork() 跨进程时可正常工作。 - Cartroo
1
有一件事我忘了提 - 多进程方法可能看起来是提高当今多核系统性能的明显方法,但请记住会有很多内存共享,所以这并不是简单的。例如,在 NUMA 架构(例如 AMD Opterons)上,频繁访问相同内存的多个核心可能会导致性能下降。即使在 SMP 系统上,如果瓶颈是内存总线,也不清楚多进程是否会带来任何好处。 - Cartroo
1
是的,实际上我认为一个核心用于编写和连接管道,另一个使用 epoll 拼接到套接字会很有效。我认为在 CPU 甚至内存总线遇到瓶颈之前,10GB 网卡将首先面对它。但拼接/连接架构将为实际处理数据留下更多的 CPU 资源,这非常好。 - Eloff
仅仅点赞这个答案是不够的,我必须评论一下:分析和建议非常出色,清晰易懂。在我的玩具程序中,我正在尝试使用tee()/splice()(utee,基本上是一个试图像tee一样的程序:https://github.com/aktau/utee)。我还没有考虑过你的建议,但正在思考它(我已经计算了tee()传输的最小量,但没有使用它)。你的答案清楚地指出了我应该做什么。 - Aktau
这是一个非常出色的问题分析,确实是一个很棒的答案。非常感谢您的分享。 - alecov

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接