ZeroMQ:如何防止无限等待?

82

我刚开始使用ZMQ。我正在设计一个应用程序,其工作流程如下:

  1. 众多客户端(随机PULL地址)之一向5555的服务器推送请求
  2. 服务器永远在等待客户端推送。当有一个推送到来时,就会为该特定请求生成一个工作进程。是的,工作进程可以同时存在。
  3. 当该进程完成任务后,它会向客户端推送结果。

我认为PUSH/PULL架构适合这种情况。请纠正我,如果我有误。


但是我该如何处理以下情况呢?

  1. 当服务器未能响应时,client_receiver.recv()将无限期等待。
  2. 客户端可能会发送请求,但会立即失败,因此工作进程将永远停留在server_sender.send()处。

那么,我该如何在PUSH/PULL模型中设置类似于超时的功能呢?


编辑: 感谢user938949的建议,我得到了一个有效的答案,现在我分享出来供后人参考。


1
我不是0mq专家,但在很多类似这样的情况下,最好在启动时创建工作池,而不是根据消息创建工作进程。也许我误解了你的意思。 - wberry
好主意。我实际上计划预先派生工作进程。我刚刚意识到,使用0mq可以轻松实现。 - Jesvin Jose
4个回答

88
如果您使用的是 zeromq >= 3.0 版本,则可以设置 RCVTIMEO 套接字选项:
client_receiver.RCVTIMEO = 1000 # in milliseconds

但通常情况下,您可以使用轮询器:

poller = zmq.Poller()
poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send

而且 poller.poll() 接受一个超时时间:

evts = poller.poll(1000) # wait *up to* one second for a message to arrive.

如果没有要接收的内容,evts 将是一个空列表。

您可以使用 zmq.POLLOUT 进行轮询,以检查发送是否成功。

或者,要处理可能失败的对等方的情况,可以:

worker.send(msg, zmq.NOBLOCK)

可以使用"might suffice",它会立即返回-如果发送无法完成,则会引发ZMQError(zmq.EAGAIN)异常。


1
你能详细说明一下 zmq.NOBLOCK 吗? - Jesvin Jose
你好,每次我们断开并重新连接套接字(在轮询器中),我们是否需要重新注册它? - mariolpantunes
不需要重新注册,除非您关闭套接字并打开一个新的。 - minrk
4
如@Adobri和@mknaf在下面所说:如果使用zmq.RCVTIMEO,则还需要设置zmq.LINGER,否则即使超时后套接字仍然不会关闭。 在Python中,应为socket.setsockopt(zmq.RCVTIMEO, 1000) socket.setsockopt(zmq.LINGER, 0),然后使用message = socket.recv()来接收消息。 - dthor
1
这两行代码在Python中都可以使用:results_receiver.RCVTIMEO = 1000results_receiver.setsockopt(zmq.RCVTIMEO, 1000) - silgon
如果您正在使用“try: except:”模式,还必须引发错误“zmq.ZMQError”。 - silgon

18

在我参考了user938949的答案和http://taotetek.wordpress.com/2011/02/02/python-multiprocessing-with-zeromq/后,我做了一个快速hack。如果你有更好的方法,请发表你的答案,我会推荐你的答案

对于那些想要可靠性的持久解决方案,请参阅http://zguide.zeromq.org/page:all#toc64

zeromq的3.0版本(目前是beta版)支持ZMQ_RCVTIMEO和ZMQ_SNDTIMEO中的超时http://api.zeromq.org/3-0:zmq-setsockopt

服务器

zmq.NOBLOCK确保在不存在客户端时,send()不会阻塞。

import time
import zmq
context = zmq.Context()

ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")

i=0

while True:
    i=i+1
    time.sleep(0.5)
    print ">>sending message ",i
    try:
        ventilator_send.send(repr(i),zmq.NOBLOCK)
        print "  succeed"
    except:
        print "  failed"

客户端

轮询对象可以监听多个接收套接字(参见上面链接的 "Python 多进程应用 ZeroMQ")。我仅在 work_receiver 上提供了链接。在无限循环中,客户端以 1000 毫秒的间隔进行轮询。如果在该时间内没有收到任何消息,则 socks 对象返回空值。

import time
import zmq
context = zmq.Context()

work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")

poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)

# Loop and accept messages from both channels, acting accordingly
while True:
    socks = dict(poller.poll(1000))
    if socks:
        if socks.get(work_receiver) == zmq.POLLIN:
            print "got message ",work_receiver.recv(zmq.NOBLOCK)
    else:
        print "error: message timeout"

Python有 select 吗?Ruby的库有一个像 IO.select 这样的。你可以使用以下代码:if ZMQ.select([read_socket], nil, nil, 20); puts read_socket.recv; else; puts 'timeout 20 secs'; end - mixonic

12

如果你只在等待一个socket的话,而不是创建一个Poller,你可以这样做:

if work_receiver.poll(1000, zmq.POLLIN):
    print "got message ",work_receiver.recv(zmq.NOBLOCK)
else:
    print "error: message timeout"

如果你的超时时间因情况而异,可以使用这个方法,而不是设置work_receiver.RCVTIMEO


1
很好的回答 :) - Google
我最喜欢这个答案,适用于简单情况,只需超时一个套接字 - 无需创建轮询器,也无需注册套接字。 - Blindfreddy

11

如果您使用ZMQ_NOBLOCK,则发送操作不会阻塞,但是如果尝试关闭套接字和上下文,则该步骤将阻止程序退出。

原因是套接字等待任何对等方,以确保传出消息得到排队。要立即关闭套接字并清空缓冲区中的传出消息,请使用ZMQ_LINGER并将其设置为0。


2
如果您不使用zmq.LINGER,那么zmq.RCVTIMEO将无法帮助您,因为在超时后套接字仍然不会关闭。这应该添加到所选答案中。 - mknaf

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接