如何在ZMQ sub server socket断开连接时,使ZMQ pub client socket缓存消息。

3

假设有两个应用程序,其中应用程序A使用发布者客户端不间断地向应用程序B进行数据流传输,而应用程序B具有一个子服务器套接字来接受该数据,我们如何配置应用程序A中的pub客户端套接字,以便当B不可用(例如它正在重新部署、重新启动)时,A缓存所有未决消息,并且当B变为可用时,缓冲的消息会通过并使套接字追上实时流?

简而言之,我们如何使PUB客户端套接字在SUB服务器不可用时缓冲一定量的消息?

PUB客户端的默认行为是进入静默状态,但如果我们可以将其更改为限制大小的缓冲区,那将非常好,这在zmq中是否可能?还是我需要在应用程序级别上进行处理...

我尝试在我的套接字中设置HWM和LINGER,但如果我没有错的话,它们只负责慢消费者情况,其中我的发布者连接到订阅者,但订阅者太慢,以至于发布者开始缓冲消息(hwm将限制这些消息的数量)...

我正在使用jeromq,因为我针对jvm平台。


一个更好的策略是,一旦检测到订阅者已经“断开连接”,就告诉发布者停止发布。你考虑过使用 socket.monitor 并监听 ZMQ_EVENT_DISCONNECTED 事件吗? - smac89
实际上,当我思考这个问题时,我意识到这已经是默认行为了。如果没有任何连接到发布者套接字上,它将简单地丢弃消息。我想我的想法是,如果你有一条数据流传输到发布者,那么一旦检测到事件,这条数据流就应该停止。 - smac89
默认行为是丢弃消息,但我认为这是非常普遍的情况。如果您是发布者,且您的客户端套接字知道消息应传递到哪里,那么您就知道如果该目标不可用,您需要进行缓冲以避免消息丢失。 - vach
仅看zeromq HWM docs,它说:“当您的套接字达到其HWM时,它将根据套接字类型阻止或丢弃数据PUB和ROUTER套接字如果达到其HWM,则会丢弃数据,而其他套接字类型则会阻塞。在inproc传输中,发送方和接收方共享相同的缓冲区,因此真正的HWM是由双方设置的HWM之和。”所以我的问题是,您是否正在寻找一个外部内存缓冲区作为解决方案?除了我的建议之外,使用大HWM是唯一的解决方案。 - smac89
我能理解你认为在订阅者连接时缓冲所有消息会很容易,但你似乎忽略了有限的内存和需要猜测要给发布者多少内存的部分。如果订阅者继续失败,那怎么办?我的最初建议是检测到断开连接的订阅者将与你缓冲所有内容的想法完全相同。唯一的区别是你的想法有可能使用更多内存,并且如果订阅者无法跟上,则不太可靠。 - smac89
显示剩余7条评论
3个回答

3

首先,欢迎来到Zen-of-Zero的世界,延迟最为重要。

序言:

ZeroMQ由Pieter HINTJENS团队设计,其中最有经验的大师Martin SUSTRIK被命名为第一位。该设计专业精良,以避免任何不必要的延迟。因此,问及是否具有(有限的)持久性?不,先生,未经确认的“PUB/SUB可扩展正式通信模式原型”将不会内置它,因为这会增加问题并降低性能和可伸缩性(添加延迟,处理和内存管理)。

如果需要(有限的)持久性(用于不存在远程SUB端代理的连接),请随意在应用程序端实现它,或者可以设计并实现一个新的符合ZMTP标准的行为模式原型,扩展ZeroMQ框架,如果这项工作进入稳定和公开接受的状态,但不要请求高性能,削减延迟的标准PUB/SUB朝这个方向进行修改。这绝对不是正确的做法。

解决方案?

使用双指针循环缓冲区,工作在一种类似于Persistence-PROXY(app-side-managed)模式下,App端可以轻松实现您添加的逻辑,但位于PUB发送方之前。

如果您的设计还喜欢使用最近提供的内置ZeroMQ-socket_monitor组件来设置额外的控制层并在那里接收事件流,则您的设计可能会成功地从ZeroMQ内部细节中挤出一些额外的内容。这些与网络和连接管理相关的事件可以为您的(app-side-managed)-Persistence-PROXY带来更多的启示。

然而,请注意:

_zmq_socket_monitor()_方法仅支持面向连接的传输,即TCP、IPC和TIPC。

所以如果计划使用最终有趣的传输类之一,就可以直接忘记这个{ inproc:// | norm:// | pgm:// | epgm:// | vmci:// }

注意!

我们的社区荣誉成员smac89提供的信息存在不准确、甚至错误的部分,他尽力回答了您在评论中表达的额外兴趣:

"...zmq 优化了主题发布吗?比如如果你快速地在一些 100 字符长的 topic 上发布,它实际上是每次都发送 topic,还是将其映射到某个 int 并随后发送 int...?"

他告诉你:

"它将始终发布topic。当我使用pub-sub模式时,我通常先发布topic,然后再发布实际消息,所以在订阅者中,我只需读取第一帧并忽略它,然后读取实际消息"

ZeroMQ 不是这样工作的。没有“单独”的<topic>,后面跟着一个<message-body>,相反是相反的

TOPIC 和主题过滤的机械化工作方式非常不同。

1) 你永远不知道,谁会调用 .connect()
例如,可以几乎确定版本2.x到版本4.2+将以不同的方式处理主题过滤(ZMTP:RFC定义了初始能力版本握手,让Context-实例决定使用哪个版本的主题过滤:
ver 2.x 通常将所有消息移动到所有对等方,并让所有SUB-side(来自ver 2.x+)接收消息(并让SUB-side Context-实例处理本地topic列表过滤处理)


ver 4.2+ 确定在**PUB-side Context-实例上执行topic-列表过滤处理(CPU使用率增加,网络传输相反),因此您的SUB-side将永远不会收到任何“无用”的字节读作“未订阅”消息。

2) (你可以这样做,但是)没有必要将“主题”分成因此暗示的多帧消息的第一帧。也许恰恰相反(在高性能、低延迟的分布式系统设计中这样做是一种反模式)。

主题过滤过程被定义为逐字节工作,从左到右,对于主题列表成员值,进行与传递的消息有效负载的每个模式匹配。

添加额外数据,额外的帧管理处理只会增加端到端的延迟和处理开销。这不是一个好主意,而是应该进行适当的设计工作。


结语:

在专业的设计中,没有容易的胜利,也没有低垂的果实,特别是当或超低延迟是设计目标时。

另一方面,确保ZeroMQ框架考虑到了这一点,并且这些努力得到了稳定、终极性能良好、平衡的一套工具,用于智能(按设计)、快速(运行)和可扩展(魔鬼也会羡慕)的信令/消息服务,人们喜欢使用它们,正因为这种设计智慧。

希望您享受ZeroMQ的使用,并随意在您选择的应用程序套件内添加任何额外的功能“前端”。


1
虽然感谢您的回答,尤其是风格,但我不同意这在任何方面都需要在性能或延迟上做出妥协。老实说,我很惊讶这是不可能做到的,并且不是zeromq的默认行为...缓冲未传递的消息以在有机会时传递它们不应该是太过分的要求... - vach
虽然我理解了被评论的观点,但我无法确认如果添加了此功能,它是否有足够的理由被包含在管道中。许多服务处理消息流中介序列的最新状态更新。如果存在计算机视觉过程,消耗最新的闭路电视快照以检测潜在的周界侵犯,那么它的焦点集中在最近的1个消息上,而不是最近历史记录的“(暂时)存档部分”-恰恰相反。即使zmq.CONFLATE选项也更进一步,跳过除已传递的1条消息之外的所有消息。 - user3666197
ZeroMQ 的目标不是创建事件历史的数字副本,而是提供智能、高性能、低延迟、几乎线性可扩展的解决方案(这将是昂贵的部分,如果数百、数千和数万个订阅者进出服务,如您所建议的评论 - 只是数据量的 GB + 大量 [us ~ ms] 的数据流通过一些硬件可用的有争议的 mem-I/O 通道进入/从 RAM 中流出)。不,先生,将其硬编码到 PUB/SUB 中将是一个绝妙的想法。只需测试基于代理的持久性并比较增加的延迟即可。 - user3666197
我会试一下,我打算在grpc中实现发布/订阅,添加这个缓冲功能并比较性能...我不认为我会失望...我选择zmq的原因是它的思想简单,有专门的智能套接字(而不是性能和所有这些繁琐的东西,在jvm版本中可疑)。 - vach

3

我在这里发布一个快速更新,因为其他两个答案(虽然很有用但事实上是错误的),我不希望其他人从我的被接受的答案中获得错误信息。 不仅可以使用zmq来完成此操作,而且它实际上是默认行为

诀窍是,如果您的发布者客户端之前从未连接到订阅服务器,则会不断丢弃消息(这就是为什么我认为它不会缓冲消息的原因),但是如果您的发布者连接到订阅者并且您重新启动订阅者,则发布者将缓冲消息直到达到HWM,这正是我所要求的...因此简而言之,发布者想知道有没有人在另一端接收消息,只有在这之后它才会缓冲消息...

下面是一些演示这一点的示例代码(您可能需要进行一些基本编辑才能编译它)。

我只使用了这个依赖项org.zeromq:jeromq:0.5.1

zmq-publisher.kt

fun main() {
   val uri = "tcp://localhost:3006"
   val context = ZContext(1)
   val socket = context.createSocket(SocketType.PUB)

   socket.hwm = 10000
   socket.linger = 0
   "connecting to $uri".log()
   socket.connect(uri)

   fun publish(path: String, msg: Msg) {
      ">> $path | ${msg.json()}".log()
      socket.sendMore(path)
      socket.send(msg.toByteArray())
   }

   var count = 0

   while (notInterrupted()) {
      val msg = telegramMessage("message : ${++count}")
      publish("/some/feed", msg)
      println()

      sleepInterruptible(1.second)
   }
}

当然,还有zmq-subscriber.kt


fun main() {
   val uri = "tcp://localhost:3006"
   val context = ZContext(1)
   val socket = context.createSocket(SocketType.SUB)

   socket.hwm = 10000
   socket.receiveTimeOut = 250

   "connecting to $uri".log()
   socket.bind(uri)

   socket.subscribe("/some/feed")

   while (true) {
      val path = socket.recvStr() ?: continue
      val bytes = socket.recv()
      val msg = Msg.parseFrom(bytes)
      "<< $path | ${msg.json()}".log()
   }
}

先尝试运行发布者(publisher),不开启订阅者(subscriber)。然后当你启动订阅者时,你错过了到目前为止所有的消息...现在,不需要重新启动发布者,停止订阅者并等待一段时间,然后再次启动它。

这是我一个服务实际受益于此的示例...以下是结构:[当前服务]sub:server <= pub:client[被重启的服务]sub:server <=* pub:client[多个发布者]

因为我在中途重启了服务,所以所有的发布者都开始缓存它们的消息,最终的服务观察到每秒约200条消息的下降至0(其中1或2条是心跳包),然后突然出现1000多条消息的爆发,因为所有的发布者都刷新了它们的缓冲区(重新启动大约需要5秒钟)......在这里我实际上没有失去任何一条消息...

enter image description here

请注意,你必须有subscriber:server <= publisher:client配对(这样发布者就知道“只有一个地方需要将这些消息传递给”(你可以尝试在发布者上绑定并在订阅者上连接,但你将不再看到发布者缓冲消息,因为它的可疑性是:刚刚断开连接的订阅者是否停止需要数据或者是它出现故障了)。


2
正如我们在评论中讨论的那样,如果publisher没有任何连接,它就无法缓存消息,而只会删除任何新消息:
从文档中可以看出:
如果发布者没有连接的订阅者,那么它将简单地丢弃所有消息。
这意味着你的缓冲区需要在zeromq之外。你的缓冲区可以是一个列表、一个数据库或者你选择的任何其他存储方法,但你不能使用你的发布者来做到这一点。
现在下一个问题是如何检测订阅者何时连接/断开连接。这是必要的,以告诉我们何时需要开始从缓冲区读取/填充缓冲区。
我建议使用Socket.monitor并监听ZMQ_EVENT_CONNECTEDZMQ_EVENT_DISCONNECTED,因为这些事件会告诉您客户端何时连接/断开连接,从而使您能够切换到填充所选缓冲区的操作。当然,可能有其他方法可以实现此功能,而不直接涉及zeromq,但这取决于您自己的决定。

谢谢,你知道zmq是否优化了主题发布吗?比如说如果你快速地在某个100个字符长的主题上不断发布,它是每次都发送主题还是将其映射到某个整数并随后发送整数呢?(这是另一个问题,但也许你知道 :)) - vach
1
@vach 它将始终发布主题。当我使用发布-订阅模式时,通常先发布主题,然后再发布实际消息,因此在订阅者中,我只需读取第一帧并忽略它,然后再读取实际消息。 - smac89
@smac89 你可能已经知道,很久以前 ZeroMQ 就不是这样工作的了。此外,你选择发送多帧消息负载(虽然合法,但如果性能和处理开销不是你关心的问题)太低效,并且远离智能主题过滤的初始设计和实现方式。这是完全不必要的,因为它“加倍”了如何在 ZeroMQ 的 PUB/SUB 原型消息流处理“引擎盖下”实际工作的关注点。 - user3666197
@user3666197 感谢您的纠正。当您说主题过滤不按照我描述的方式工作时,您是指订阅者没有收到主题帧,只有实际消息被发送了吗?我使用 jzmq 和 zmq 版本 4.3,将消息发布给订阅者的方式是先用 sendMore 发送主题,然后用 send 发送 flatbuffer 字节。在订阅方面,我使用 recv 接收主题,然后下一个带有标志零的 recv 给出 flatbuffer。我想知道的是,我在这里做错了什么,不符合 zmq 的方式,我该如何解决? - smac89
ZeroMQ API文档非常清晰地解释了它的工作原理(+Martin SUSTRIK在250bpm.com上有关于使过滤器智能和快速的性能相关文章)。你可以阅读https://stackoverflow.com/a/58397285/3666197并进行实验:**迭代** i = 0..2 ^ 31 并发送 PUB.send( str( i ) ),然后有四个不同的SUB,每个都.connect()-ed并订阅了不同的主题字符串,如引用链接所示,并使每个SUB在屏幕上打印.recv()接收到的消息。你会看到差异-每个SUB已经接收到和未接收到的内容。 - user3666197
@user3666197 我发现这个实验并没有回答我的问题。我知道过滤器可以通过查找消息前缀来有效地将消息发送到正确的订阅者,但它也会发送主题,这就是我在最初被问到的问题中回答的内容。我可能用了错误的术语来描述我的意思,但我并不是想说为了使过滤器工作需要发送两条消息。我试图描述的正是 这篇博客 中最后两个代码示例所做的事情。 - smac89

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接