使用RabbitMQ,有没有一种方法可以在不进行出列操作的情况下查看队列内容?

52
作为学习RabbitMQ和Python的途径,我正在开发一个项目,可以在多台计算机之间分配h264编码任务。基本功能已经完成,我编写了一个守护进程,在Linux或Mac上运行,连接到队列,接受任务并使用HandBrakeCLI进行编码。一旦编码完成,它会确认消息。我还建立了一个简单的工具来将项目推入队列。
现在,我想扩展推送项目到队列的工具的功能,以便能够查看队列中的内容。我知道可以查看队列中有多少项目,但我希望能够获取实际的消息,这样就可以显示等待编码的电影或电视节目是什么了。我的想法是,当编码客户端完成任务时,队列管理器会收到来自编码客户端的消息,然后刷新队列列表。
我知道有一种复杂的方法可以使队列管理器的列表与实际工作队列保持同步,但我希望这种方法是“持久性”的,也就是说,即使关闭队列管理器,以后重新打开也能看到队列中的内容。

我通过 Twitter 收到了这条消息 - “不,RabbitMQ 的队列是纯 FIFO 结构,没有 peek。但是,可以查看带有 acks 的 basic.consume/get。” - Dustin
3个回答

48

直接浏览队列不受支持,但如果您声明一个不自动确认且不ACK收到的消息的队列,则可以查看其中所有内容。查看完后,发送一个CANCEL到通道,或断开并重新连接以导致所有消息重新排队。这会增加消息头中的一个数字,但除此之外不会影响消息。

我构建了一个应用程序,其中消息排序并不是非常重要,我经常以这种方式浏览队列。如果我发现问题,我会将消息转储到文件中,修复它们并重新提交。

此外,如果您只需要偶尔查看一两条消息,可以使用RabbitMQ管理插件来查看。

另外,如果您只需要消息计数,则每次声明队列时或在基本的get命令上获取即可。


2
只是为了扩展一下有关消息计数的获取,您可以声明一个带有passive=True的队列,如果它不存在,则不会创建队列。如果存在,则会返回消息和消费者的数量。 - Dave Foster
RabbitMQ有一个专有的扩展名叫做Firehose Tracer,可以访问所有发布的消息。详情请见:https://www.rabbitmq.com/firehose.html - Tom Pohl

7

基于你的回答,为了使其他人更容易理解,我在这里提供一个no_ack示例:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='Q.hello')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    # ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback, queue='Q.hello')

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

ch.basic_ack(delivery_tag=method.delivery_tag)在这种情况下有效地作为“从队列中删除”的命令。+1 - AlejandroVD
2
这非常有帮助,但我遇到了一个问题,就是无法找到一种不会无限制阻塞的方法来实现这个功能。基本上,我只想查看队列中固定数量的消息,并且不需要等待更多的消息进来。有什么想法吗? - zjm555

1

你想要做的是浏览队列,但我从this了解到RabbitMQ目前还不支持该功能。


10
虽然没有直接支持这个功能,但您可以声明一个不具备自动确认功能的队列,并且在收到消息后不进行确认,这样就可以查看其中的所有内容。查看完毕后,发送一个 CANCEL 到通道上,或者断开连接并重新连接以使所有消息重新排队。这会增加消息标头中的一个数字,但是不会对消息本身进行任何更改。 - Michael Dillon
5
@MichaelDillon - 那个评论应该作为一个回答。 - cdeszaq
4
@cdeszaq: 是的,你是对的。我写了一个带有更多信息的答案。 - Michael Dillon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接