如何在ZMQ中实现(X)PUB/(X)SUB消息的代理/经纪人?

12

我在阅读有关如何在ZMQ中为(X)PUB/(X)SUB传递消息创建代理/经纪人的这篇文章。文章中有一张漂亮的图片展示了架构的样子:

数据流: 用户代码 -> PUB -> XSUB -> 用户代码 -> XPUB -> SUB -> 用户代码; 订阅流: 用户代码 <- PUB <- XSUB <- 用户代码 <- XPUB <- SUB <- 用户代码;

但是,当我查看XSUB套接字描述时,由于其“出站路由策略”为N/A,我不知道如何通过它转发所有订阅。

那么,如何在ZeroMQ中实现(取消)订阅转发,有哪些最小用户代码可用于这种转发应用程序(可以插入到简单的发布者订阅者示例之间)?

1个回答

18

XPUB会接收消息-它所接收的唯一消息是来自连接的订阅者的订阅,这些消息应该通过XSUB原样向上转发。

中继消息最简单的方法是使用zmq_proxy

xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
pub = ctx.socket(zmq.PUB)
pub.bind(pub_url)
zmq.proxy(xpub, xsub, pub)

它将中转xpub和xsub之间的消息。如果需要,您可以添加一个PUB套接字以监视通过其传递的流量,同时保留原有功能。

如果您希望在中间插入用户代码以实现额外的路由逻辑,则可以执行以下操作来重新实现zmq_proxy的内部循环:

def broker(ctx):
    xpub = ctx.socket(zmq.XPUB)
    xpub.bind(xpub_url)
    xsub = ctx.socket(zmq.XSUB)
    xsub.bind(xsub_url)

    poller = zmq.Poller()
    poller.register(xpub, zmq.POLLIN)
    poller.register(xsub, zmq.POLLIN)
    while True:
        events = dict(poller.poll(1000))
        if xpub in events:
            message = xpub.recv_multipart()
            print "[BROKER] subscription message: %r" % message[0]
            xsub.send_multipart(message)
        if xsub in events:
            message = xsub.recv_multipart()
            # print "publishing message: %r" % message
            xpub.send_multipart(message)

        # insert user code here

完整的(Python)工作示例


你可以用 zmq.proxy(xsub, xpub)(在 C 中是 zmq_proxy())替换从 poller = … 开始的所有内容。唯一需要注意的是,为了查看传输中的消息,你需要将一个捕获套接字作为第三个参数传递进去,这需要更多的操作。 - Marcelo Cantos
那样做没有抓住重点,即在设备内添加用户代码。如果您不需要在XPUB和XSUB之间添加代码,则代理可以正常工作(或者在Python中,您可以使用monitored_queue,zmq_proxy的前身)。 - minrk
从“这样的转发应用程序的最小用户代码是什么”这个短语中,我很清楚地看出OP没有一些他们想要在XSUB和XPUB之间注入的代码块。他们只是想要一个最简单的转发解决方案。即使如此,提供更灵活的解决方案也完全可以,但应该让OP知道,如果他们不需要进行MITM操作,存在一个更简单的解决方案。 - Marcelo Cantos
没问题,我会在回答中加入关于何时使用手动代理的澄清。 - minrk
1
C++问题的Python答案 - 太棒了!在语言移植方面,ZMQ团队所做的工作让人赞不绝口。 - Patrick B.

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接