皮卡加兔子消息队列:将basic_qos设置为prefetch=1仍然似乎会消耗队列中的所有消息

22

我有一个Python工作客户端,可以启动10个工作进程,每个进程都连接到一个RabbitMQ队列。就像这样:

#!/usr/bin/python
worker_count=10

def mqworker(queue, configurer):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
    channel = connection.channel()
    channel.queue_declare(queue=qname, durable=True)
    channel.basic_consume(callback,queue=qname,no_ack=False)
    channel.basic_qos(prefetch_count=1)
    channel.start_consuming()


def callback(ch, method, properties, body):
    doSomeWork();
    ch.basic_ack(delivery_tag = method.delivery_tag)

if __name__ == '__main__':
    for i in range(worker_count):
        worker = multiprocessing.Process(target=mqworker)
        worker.start()

我的问题是,尽管在通道上设置了basic_qos,但第一个启动的worker会接受队列中的所有消息,而其他worker则空闲等待。我可以在rabbitmq界面中看到这一点,即使我将 worker_count 设置为1并将50条消息放入队列中,所有50条消息都进入“未确认”桶中,而我期望有1条未确认和另外49条就绪。

为什么它不起作用?

1个回答

32

我似乎通过移动basic_qos的调用位置解决了这个问题。

channel = connection.channel()之后立即放置它似乎改变了行为,符合我的期望。


1
谢谢!那解决了问题。顺便说一下,这很难调试。 - Sajuuk
1
@Hiagara,是的,我今天也遇到了这个问题。令人惊讶的是,将近5年过去了,API中仍然没有明确或记录这一点。 - Jordan
12
我认为我们应该在basic_consume之前声明basic_qos,因为basic_consume在初始化时使用这个设置。 - rborodinov
2
同意@rborodinov的观点。我在basic_consume之后立即使用了basic_qos,但它没有起作用。将它们交换一下,现在就可以正常工作了。 - Highstaker
1
我在设置basic_consume时还必须将auto_ack=False才能使其正常工作。否则它仍会消耗比预期更多的消息。 - Tobias
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接