ZMQ Pub 需要使用 while 循环来发送消息。

3
我注意到zmq.PUB的行为很奇怪,它需要一个while循环来发送消息。例如:
假设我有一个等待发布者的订阅者:
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.setsockopt(zmq.SUBSCRIBE, '')
subscriber.connect ('tcp://*:5555')
while True:
    msg = socket.recv()

我想从pub发送一条消息:

context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind('tcp://*:%5555')
publisher.send(''.join(sys.argv[1]))

由于某些原因,此消息将无法传递到子级。您可以通过在发送消息 publisher.send(''.join(sys.argv[1])) 之前添加 whilefor 循环来修复此问题。

为什么会这样?我是否必须始终使用循环与发布者一起分发消息给多个工作程序?


你的 publisher.send 调用是脚本中的最后一行吗(也就是说,调用之后脚本立即退出)? - larsks
@larsks 是的,但是,它是否应该在 .send 上发送消息?如果不是,那么对我来说似乎是错误的设计,因为我正在使用阻塞的 zmq 模块而不是带有 gevent 的模块。 - izdi
1个回答

2

实际上,这里有几个问题。

首先,你发布的代码中存在许多错误(例如,你不能连接到 tcp://*:5555),但我会假设它代表了你想要做的事情。

更新

我将保留我的原始答案,因为我认为它仍然有用,尽管与你的问题没有直接关系(可能是因为早上缺乏咖啡因,谁知道呢?)。

由于你的发布者使用单个消息调用 send 然后立即退出,所以订阅者可能永远没有时间去调用 connect()。通常你期望调用 bind 的代码是长时间运行的进程。如果在 bindsend 调用之间插入一个 sleep,则一切都将按预期工作:

ctx = zmq.Context()
pub = ctx.socket(zmq.PUB)
pub.bind('tcp://*:5555')

time.sleep(1)
pub.send('message 1')

假设以下订阅者,这将正常工作:

ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, '')
sub.connect('tcp://localhost:5555')

while True:
    msg = sub.recv()
    print msg

然而,这并不是一个好的解决方案...任何使用sleep进行同步的方法都注定会失败。另一种解决方法是让发布者打开一个PUB套接字和一个REP套接字。为了向所有订阅者发送消息,您的工具将打开一个REQ套接字到代理,代理将把消息发布给所有订阅者。

下面是原始答案:


你遇到的问题可能是这个。由于你的订阅者调用了bind,而你的发送者调用了connect,所以很可能你只是因为连接成功和订阅者成功订阅消息之间存在延迟而丢失了消息。还可以参见FAQ中的问题“为什么我在绑定套接字与连接套接字时看到不同的行为?”。

如果我们有这个订阅者的代码来证明这一点:

ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
sub.setsockopt(zmq.SUBSCRIBE, '')
sub.bind('tcp://*:5555')

while True:
    msg = sub.recv()
    print msg

还有以下的发布者:

ctx = zmq.Context()
pub = ctx.socket(zmq.PUB)
pub.connect('tcp://localhost:5555')

pub.send('message 1')
pub.send('message 2')
time.sleep(1)
pub.send('message 3')

订阅者最终可能会总是打印输出:
message 3

这是因为首次 send 操作太快。通常情况下,这并不被认为是一个问题,因为 pub/sub 明确地不是一种可靠的传输机制。在典型的使用情况中,有一个单一的发布者调用 bind,多个订阅者调用 connect,可能并不都一开始就存在或持续存在。不能保证每个订阅者都会收到每个消息。如果您需要可靠地传输单个消息,请考虑使用 REQ/REP 套接字对。

实际上,在我的代码中,我在SUB上使用connect,在PUB上使用bind。而且我还需要将我的消息分发到多个订阅者。 - izdi
那不是你发布的示例,这使得很难给出准确的答案。我会再看一遍。 - larsks
嗯,从一开始就是这样的,看看你的例子中的区别。我同意等待套接字发送消息时使用sleep,但实际上它并没有真正发送消息;然而,如果你使用任何类型的循环,它将会发送消息。 - izdi
是吗?那我可能还没喝够咖啡。 - larsks

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接