我有一个Python工作客户端,可以启动10个工作进程,每个进程都连接到一个RabbitMQ队列。就像这样:
#!/usr/bin/python
worker_count=10
def mqworker(queue, configurer):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
channel = connection.channel()
channel.queue_declare(queue=qname, durable=True)
channel.basic_consume(callback,queue=qname,no_ack=False)
channel.basic_qos(prefetch_count=1)
channel.start_consuming()
def callback(ch, method, properties, body):
doSomeWork();
ch.basic_ack(delivery_tag = method.delivery_tag)
if __name__ == '__main__':
for i in range(worker_count):
worker = multiprocessing.Process(target=mqworker)
worker.start()
我的问题是,尽管在通道上设置了basic_qos,但第一个启动的worker会接受队列中的所有消息,而其他worker则空闲等待。我可以在rabbitmq界面中看到这一点,即使我将 worker_count
设置为1并将50条消息放入队列中,所有50条消息都进入“未确认”桶中,而我期望有1条未确认和另外49条就绪。
为什么它不起作用?
basic_consume
之前声明basic_qos
,因为basic_consume
在初始化时使用这个设置。 - rborodinovbasic_consume
之后立即使用了basic_qos
,但它没有起作用。将它们交换一下,现在就可以正常工作了。 - Highstakerbasic_consume
时还必须将auto_ack=False
才能使其正常工作。否则它仍会消耗比预期更多的消息。 - Tobias