使用Python / Pika消费多个队列

44

我正在尝试创建一个消费者,该消费者将订阅多个队列,然后在消息到达时处理这些消息。

问题在于,当第一个队列中已经存在一些数据时,它会消耗第一个队列并永远不会去消费第二个队列。 但是,当第一个队列为空时,它会进入下一个队列,然后同时消费两个队列。

我最初实现了线程,但希望避免使用它,因为 pika 库可以在没有太多复杂性的情况下为我完成。以下是我的代码:

import pika

mq_connection = pika.BlockingConnection(pika.ConnectionParameters('x.x.x.x'))
mq_channel = mq_connection.channel()
mq_channel.basic_qos(prefetch_count=1)


def callback(ch, method, properties, body):
    print body
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)

mq_channel.basic_consume(callback, queue='queue1', consumer_tag="ctag1.0")
mq_channel.basic_consume(callback, queue='queue2', consumer_tag="ctag2.0")
mq_channel.start_consuming()

1
我尝试了您的代码,只是添加了一个记录器以防止异常,并声明了队列。代码按预期工作。我向每个队列发布了一些消息,这些消息被路由并在CLI上回显。 - old_sound
3
就是这个意思。很奇怪,对吧?你有任何想法吗? - user3295878
它和其他客户端一样工作吗?您可以在任何其他客户端上试用吗?如果这是特定于 Pika 的问题,就必须提出来。尽管 Gavin 提供了一个好建议,但已经实施了。 - user3295878
我刚试了一下php-amqplib客户端,它按预期工作。我向两个队列预先发布消息,然后所有消息都被消费。 - old_sound
好的,我本来打算提出这个问题,但后来发现 Gavin 是 pika 的作者。现在看来,解决这个问题是 Gavin 的责任了。 - user3295878
显示剩余4条评论
3个回答

27

一种可能的解决方案是使用非阻塞连接并消费消息。

import pika


def callback(channel, method, properties, body):
    print(body)
    channel.basic_ack(delivery_tag=method.delivery_tag)


def on_open(connection):
    connection.channel(on_open_callback=on_channel_open)


def on_channel_open(channel):
    channel.basic_consume(queue='queue1', on_message_callback=callback)
    channel.basic_consume(queue='queue2', on_message_callback=callback)


parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection = pika.SelectConnection(parameters=parameters,
                                   on_open_callback=on_open)

try:
    connection.ioloop.start()
except KeyboardInterrupt:
    connection.close()

这将连接到多个队列,并根据情况消费消息。


1
你能告诉我末尾的%2F的目的是什么吗? - Rápli András
3
连接到rabbitmq时,需要指定虚拟主机。默认主机为/,会被转义为%2f - Chillar Anand
1
值得注意的是,这段代码在使用Pika 1.1.0时对我来说无效。只需要在on_open方法中添加on_open_callback=:connection.channel(on_open_callback=on_channel_open)并在on_channel_open方法中添加on_message_callback=:channel.basic_consume(on_open_callback=callback, queue='queue1') channel.basic_consume(on_open_callback=callback, queue='queue2') - StratocastFlo
有没有一种方法可以在队列之间定义优先级? - ThunderPhoenix
顺便提一下,您也可以使用带有一个通道的pika阻塞连接来完成此操作。 自2015年以来,pika已经有了测试代码 - Trevor Boyd Smith

2

问题很可能是第一次调用已经发出了Basic.Consume并在第二次调用之前从预填充队列中接收了消息。您可以尝试将QoS预取计数设置为1,这将限制RabbitMQ一次只发送一个消息。


它已经被设置为1,就像代码中看到的那样。还有什么其他想法吗? - user3295878
还有一件事,我认为消费者在调用start_consuming之前不会真正开始消费。不过需要进行验证。 - user3295878
嘿,Gavin,我刚刚尝试了使用Python的kombu库的这个功能,它按预期工作。 - user3295878
我会尝试使用异步后端并告诉你。 - user3295878
我按照 https://pika.readthedocs.org/en/0.9.13/examples/asynchronous_consumer_example.html 的示例进行操作,结果一切都符合预期。首先第一个队列被完全消费,然后第二个队列也相应地被消费。 - user3295878
显示剩余2条评论

1

与上面第一个答案中的评论类似,我使用pika 1.1.0并采用以下方法获得了类似的结果:

import pika

def queue1_callback(ch, method, properties, body):
  print(" [x] Received queue 1: %r" % body)

def queue2_callback(ch, method, properties, body):
  print(" [x] Received queue 2: %r" % body)

def on_open(connection):
  connection.channel(on_open_callback = on_channel_open)


def on_channel_open(channel):
  channel.basic_consume('queue1', queue1_callback, auto_ack = True)
  channel.basic_consume('queue2', queue2_callback, auto_ack = True)

credentials = pika.PlainCredentials('u', 'p')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.SelectConnection(parameters = parameters, on_open_callback = on_open)

Try:
  connection.ioloop.start()
except KeyboardInterrupt:
  connection.close()
  connection.ioloop.start()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接