我正在尝试创建一个消费者,该消费者将订阅多个队列,然后在消息到达时处理这些消息。
问题在于,当第一个队列中已经存在一些数据时,它会消耗第一个队列并永远不会去消费第二个队列。 但是,当第一个队列为空时,它会进入下一个队列,然后同时消费两个队列。
我最初实现了线程,但希望避免使用它,因为 pika 库可以在没有太多复杂性的情况下为我完成。以下是我的代码:
import pika
mq_connection = pika.BlockingConnection(pika.ConnectionParameters('x.x.x.x'))
mq_channel = mq_connection.channel()
mq_channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
print body
mq_channel.basic_ack(delivery_tag=method.delivery_tag)
mq_channel.basic_consume(callback, queue='queue1', consumer_tag="ctag1.0")
mq_channel.basic_consume(callback, queue='queue2', consumer_tag="ctag2.0")
mq_channel.start_consuming()
php-amqplib
客户端,它按预期工作。我向两个队列预先发布消息,然后所有消息都被消费。 - old_sound