我想编写一个服务器/客户端脚本,使用一个服务器来分发任务,并有多个工作进程来执行它们。但问题是,我的分发器会很快将任务填满内存。
我试图在绑定之前设置HWM,但没有成功。它仅在工作者连接后立即发送消息,完全无视已设置的HWM。我还有一个汇总器,用于记录完成的任务。
我试图在绑定之前设置HWM,但没有成功。它仅在工作者连接后立即发送消息,完全无视已设置的HWM。我还有一个汇总器,用于记录完成的任务。
server.py
import zmq
def ventilate():
context = zmq.Context()
# Socket to send messages on
sender = context.socket(zmq.PUSH)
sender.setsockopt(zmq.SNDHWM, 30) #Big messages, so I don't want to keep too many in queue
sender.bind("tcp://*:5557")
# Socket with direct access to the sink: used to syncronize start of batch
sink = context.socket(zmq.PUSH)
sink.connect("tcp://localhost:5558")
print "Sending tasks to workers…"
# The first message is "0" and signals start of batch
sink.send('0')
print "Sent starting signal"
while True:
sender.send("Message")
if __name__=="__main__":
ventilate()
worker.py
import zmq
from multiprocessing import Process
def work():
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")
# Socket to send messages to
sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")
# Process t asks forever
while True:
msg = receiver.recv_msg()
print "Doing sth with msg %s"%(msg)
sender.send("Message %s done"%(msg))
if __name__ == "__main__":
for worker in range(10):
Process(target=work).start()
sink.py
import zmq
def sink():
context = zmq.Context()
# Socket to receive messages on
receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")
# Wait for start of batch
s = receiver.recv()
print "Received start signal"
while True:
msg = receiver.recv_msg()
print msg
if __name__=="__main__":
sink()
zmq.zmq_version()
和zmq.__version__
。 - three_pineapples