ZeroMQ:PUSH 上的 HWM 无效

5
我想编写一个服务器/客户端脚本,使用一个服务器来分发任务,并有多个工作进程来执行它们。但问题是,我的分发器会很快将任务填满内存。
我试图在绑定之前设置HWM,但没有成功。它仅在工作者连接后立即发送消息,完全无视已设置的HWM。我还有一个汇总器,用于记录完成的任务。

server.py

import zmq

def ventilate():
    context = zmq.Context()

    # Socket to send messages on
    sender = context.socket(zmq.PUSH)
    sender.setsockopt(zmq.SNDHWM, 30) #Big messages, so I don't want to keep too many in queue
    sender.bind("tcp://*:5557")


    # Socket with direct access to the sink: used to syncronize start of batch
    sink = context.socket(zmq.PUSH)
    sink.connect("tcp://localhost:5558")

    print "Sending tasks to workers…"

    # The first message is "0" and signals start of batch
    sink.send('0')
    print "Sent starting signal"

    while True:
        sender.send("Message")



if __name__=="__main__":
    ventilate()

worker.py

import zmq
from multiprocessing import Process

def work():
    context = zmq.Context()

    # Socket to receive messages on
    receiver = context.socket(zmq.PULL)
    receiver.connect("tcp://localhost:5557")

    # Socket to send messages to
    sender = context.socket(zmq.PUSH)
    sender.connect("tcp://localhost:5558")

    # Process t asks forever
    while True:
        msg = receiver.recv_msg()
        print "Doing sth with msg %s"%(msg)     
        sender.send("Message %s done"%(msg))

if __name__ == "__main__":
    for worker in range(10):        
        Process(target=work).start()

sink.py

import zmq

def sink():
    context = zmq.Context()

    # Socket to receive messages on
    receiver = context.socket(zmq.PULL)
    receiver.bind("tcp://*:5558")

    # Wait for start of batch
    s = receiver.recv()
    print "Received start signal"
    while True:
        msg = receiver.recv_msg()
        print msg


if __name__=="__main__":
    sink()

我会尝试重现您的问题。您能告诉我您正在使用的PyZMQ和ZMQ版本吗?请运行zmq.zmq_version()zmq.__version__ - three_pineapples
ZMQ版本为4.0.3,pyzmq版本为13.1.0。 - Elvin
“嗯,这是一个很烦人的组合。你能否更新到pyzmq 14.0.1并测试一下(我不介意你使用哪个zmq版本,只要让我知道)。我在Windows上使用pyzmq 13.1.0和zmq 3.x.x,更改zmq版本非常麻烦,除非更新到pyzmq v14,但我想确保您仍然可以看到该版本的问题,然后再尝试重现。” - three_pineapples
测试过v14版本,问题依旧。 - Elvin
2个回答

5
好的,我进行了一些尝试,我认为问题不在于推送高水位线(PUSH HWM),而是您无法为PULL设置高水位线。如果您查看此文档,您会发现它在HWM上说“N/A”,即无法采取任何HWM操作。
看起来PULL套接字每次都需要处理数百条消息(我尝试设置PULL套接字的HWM,以防万一它对PULL套接字有所作用。但它没有)。我通过更改供氧器,发送具有递增整数的消息,并将池中的每个工作进程更改为在调用recv()之间等待2秒钟来证明这一点。工作进程打印出他们正在处理的巨大不同的整数消息。例如,一个工作进程将处理消息10,而下一个工作进程将处理消息400。随着时间的推移,你会看到处理消息10的工作进程现在正在处理消息11、12、13等,而另一个工作进程正在处理401、402等。
这表明ZMQ_PULL套接字在某个地方缓冲消息。因此,虽然ZMQ_PUSH套接字确实具有HWM,但是PULL套接字在快速请求消息,即使它们实际上没有被recv()调用访问。因此,如果连接了PULL套接字,则会有效地忽略PUSH HWM。我认为,就我所知,您无法控制PULL套接字的缓冲区长度(我希望RCVHWM套接字选项可以控制此选项,但似乎不行)。
当然,这种行为引出了一个问题,那就是ZMQ_PULL HWM选项的作用是什么,只有在您还能控制接收套接字HWM的情况下,它才有意义。
到这个点上,我会向0MQ people询问是否您错过了一些显而易见的东西,或者这是否被视为一个bug。
抱歉我不能提供更多的帮助!

非常感谢您迄今所做的努力。我发现设置setsockopt(zmq.RCVBUF,2)实际上会减慢速度。默认情况下,它设置为0,这意味着它采用操作系统的默认缓冲区大小。不知道它是多少。它仍然没有完全达到我的要求,但它更接近了。 - Elvin

1
ZeroMQ在套接字的发送和接收端都有缓冲区,因此您需要在代码中为PUSH和PULL套接字设置高水位标记(在bind()connect()之前)以确保顺畅运行。
在Python绑定中,现在可以通过socket.hwm = 1方便地完成这一操作,它将同时设置ZMQ_SNDHWMZMQ_RCVHWM

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接