如何检查在ZeroMQ的PUB-SUB模式中由于HWM而导致消息在发送时丢失

4
我在Linux中使用ZeroMQ(更具体地说是CZMQ)实现了一个消息总线用于IPC。这是我所实现的内容,这里
我的问题是,当发布者缓冲区已满时,我如何知道发送的消息是否丢失?
在我的简单测试设置中,我使用了一个代理来进行发布-订阅。由于发送方快而接收方非常慢,导致消息达到HWM并在发送时丢失。我的期望是,发送会失败并显示“消息已丢失”错误,但事实并非如此。即使消息被丢弃(我可以通过在订阅端看到消息间隔来验证这一点),zmq_msg_send()也没有给我任何错误。
我如何知道消息何时被丢弃?如果这是预期行为,并且ZeroMQ不让我们知道这一点,那么有什么解决方法可以找到我的发送是否丢失了消息?
2个回答

1

默认情况下,从最近的版本开始,zeromq pub/sub将高水位线 ZMQ_SNDHWM/ZMQ_RCVHWM 设置为1000条消息。

这意味着,如果您在紧密循环中突发超过1000条消息,它可能会丢失一些消息。很容易编写一个测试,并给每个消息分配一个序列号的有效负载。

一种选项是将两个HWM都设置为0。这意味着它是无限的。

您可以使用我最近编写的一些示例进行实验:

https://gist.github.com/easytiger/992b3a29eb5c8545d289 https://gist.github.com/easytiger/e382502badab49856357

将会在一个传输端口上发布和订阅大量的消息。如果你调整高水位标记,你可以看到在大量的突发情况下,如果它不是0,它将会丢失很多消息。


谢谢@easytiger。我研究了一下你建议的添加序列号来解决慢订阅者问题,特别是指南中的自杀式蜗牛模式。但我的用例有点不同。我真的不关心订阅者;我想让我的发布者知道他是否因为达到HWM而丢失了消息。 - fortytwo
2
合理的关注。考虑到PUB/SUB的操作,我认为它应该默认为无限制,并设置回调机制来通知您何时达到软水印,以便您可以创建应用程序逻辑以对该情况做出反应。我想这完全取决于您的消费者速度/并行性/可扩展性和对100%可靠性的需求。我想大多数使用PUB/SUB的应用程序不需要100%的可靠性,因此采用了这种设计选择。我想,是否难以在套接字上注册回调以在特定队列大小上调用呢? - easytiger
你的意思是绕过ZeroMQ层,在Linux套接字层设置/检查队列大小吗? - fortytwo
1
我是指添加功能到ZeroMQ,使您可以在它决定放弃任何内容时得到回调。 - easytiger
我认为它需要一些工作,所以我不愿意去碰 ZeroMQ 库;官方的理由是“这会使更新更加困难”(真正的原因是懒惰)。 - fortytwo

1
你似乎在询问容错性,而PUB/SUB并不是最理想的选择。不仅可能会达到HWM,而且如果订阅客户端死亡并重新启动,则会在此期间错过发布者发送的消息。对于ZMQ v2来说,默认的PUB/SUB HWM是无限的,但在v3中更改为1000,因为由于消息排队速度比发送速度快,系统会因内存而崩溃。当平均消息速率在网络带宽范围内时,1000似乎是突发消息的合理值。你可以添加一个递增的消息编号到消息中,并让订阅者监视它,以了解何时会丢失消息。你可以选择将此号码放置在其自己的框架中或不放置;总体简单性将是决策因素。我不认为有可能确定何时特别丢失消息,因为已经达到了HWM。

2
谢谢@john。就像我之前回复中提到的那样,我研究了一下如何添加一个序列号,正如您在指南中特别提到的自杀式蜗牛模式。但我的使用情况有点不同。我真的不关心订阅者;我想让我的发布者知道他是否因为达到HWM而丢失了消息。我想你是对的;pub-sub可能不是我需求的最佳选择。我想我将不得不在ZeroMQ传输的基础上构建可靠性应用程序。 - fortytwo
@John > "当平均消息速率在网络带宽内时,1000似乎是突发消息的合理值。" 这绝对不是真的。zeromq的重点是让您不必为应用程序添加环形缓冲区和2个线程。即使这样,我几乎可以肯定,在我开始使用pub/sub之前,我还没有饱和我的10GB nics与zeromq读取。 - easytiger

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接