XPUB会接收消息-它所接收的唯一消息是来自连接的订阅者的订阅,这些消息应该通过XSUB原样向上转发。
中继消息最简单的方法是使用zmq_proxy:
xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
pub = ctx.socket(zmq.PUB)
pub.bind(pub_url)
zmq.proxy(xpub, xsub, pub)
它将中转xpub和xsub之间的消息。如果需要,您可以添加一个PUB套接字以监视通过其传递的流量,同时保留原有功能。
如果您希望在中间插入用户代码以实现额外的路由逻辑,则可以执行以下操作来重新实现zmq_proxy
的内部循环:
def broker(ctx):
xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
poller = zmq.Poller()
poller.register(xpub, zmq.POLLIN)
poller.register(xsub, zmq.POLLIN)
while True:
events = dict(poller.poll(1000))
if xpub in events:
message = xpub.recv_multipart()
print "[BROKER] subscription message: %r" % message[0]
xsub.send_multipart(message)
if xsub in events:
message = xsub.recv_multipart()
# print "publishing message: %r" % message
xpub.send_multipart(message)
# insert user code here
zmq.proxy(xsub, xpub)
(在 C 中是zmq_proxy()
)替换从poller = …
开始的所有内容。唯一需要注意的是,为了查看传输中的消息,你需要将一个捕获套接字作为第三个参数传递进去,这需要更多的操作。 - Marcelo Cantos