在Pika(AMQP Python)中获取队列大小

40

这是一个简单的问题,但谷歌或Pika开源代码都没有提供帮助。是否有一种方法可以查询当前在Pika中队列的大小(项目计数器)?

6个回答

52

我知道这个问题有点旧了,但是这里有一个使用pika实现的例子。

关于AMQP和RabbitMQ,如果您已经声明了队列,您可以使用passive标志重新声明队列,并保持所有其他队列参数不变。对此声明的响应declare-ok将包括队列中消息的数量。

以下是使用pika 0.9.5的示例:

import pika

def on_callback(msg):
    print msg

params = pika.ConnectionParameters(
        host='localhost',
        port=5672,
        credentials=pika.credentials.PlainCredentials('guest', 'guest'),
    )

# Open a connection to RabbitMQ on localhost using all default parameters
connection = pika.BlockingConnection(parameters=params)

# Open the channel
channel = connection.channel()

# Declare the queue
channel.queue_declare(
        callback=on_callback,
        queue="test",
        durable=True,
        exclusive=False,
        auto_delete=False
    )

# ...

# Re-declare the queue with passive flag
res = channel.queue_declare(
        callback=on_callback,
        queue="test",
        durable=True,
        exclusive=False,
        auto_delete=False,
        passive=True
    )
print 'Messages in queue %d' % res.method.message_count

这将打印以下内容:

<Method(['frame_type=1', 'channel_number=1', "method=<Queue.DeclareOk(['queue=test', 'message_count=0', 'consumer_count=0'])>"])>
<Method(['frame_type=1', 'channel_number=1', "method=<Queue.DeclareOk(['queue=test', 'message_count=0', 'consumer_count=0'])>"])>
Messages in queue 0
你可以从message_count成员获得消息数量。

只是留在这里,因为它咬了我并浪费了我几个小时。重要的是要理解,如果消费者没有预取太多消息,那么这只是“真实”的消息数量。请参考https://pika.readthedocs.io/en/stable/modules/channel.html#pika.channel.Channel.basic_qos中的`prefetch_count`。 - Tobias
正如被接受的答案所提到的那样,当basic_consume已经被初始化时,它不起作用,即使有很多消息,它也只会显示message_count=0。 - DZet

25

以下是使用pika获取队列长度的方法(假设您正在使用默认用户和密码在本地主机上)请将q_name替换为您的队列名称。

import pika
connection = pika.BlockingConnection()
channel = connection.channel()
q = channel.queue_declare(q_name)
q_len = q.method.message_count

6

你尝试过PyRabbit吗?它有一个get_queue_depth()方法,听起来像是你需要的。


2
我不知道PyRabbit。看起来很有前途,我会试试! - Sebastian

5
在AMQP协议中,有两种方法可以获取队列大小。您可以使用Queue.Declare或Basic.Get。如果您正在使用Basic.Consume按照到达顺序消耗消息,那么除非您断开连接(超时)并重新声明队列,否则无法获取此信息,或者获得一条消息但不确认它。在较新版本的AMQP中,您可以主动重新排队消息。
至于Pika,我不知道具体情况,但是Python客户端对于AMQP一直是我的一个问题。通常,您需要monkeypatch类以获取所需的信息,或者允许队列消费者超时,以便您可以在定期间隔内执行其他操作,例如记录统计信息或查找队列中有多少条消息。
另一种解决方法是放弃,并使用Pipe类运行sudo rabbitmqctl list_queues -p my_vhost。然后解析输出以查找所有队列的大小。如果这样做,您将需要配置/etc/sudoers,以不要请求通常的sudo密码。
我祈求有更多Pika经验的其他人回答这个问题,指出您可以执行我提到的所有操作,在这种情况下,我将下载Pika并进行测试。但如果这种情况没有发生,并且您在monkeypatching Pika代码方面遇到困难,请看看haigha。我发现他们的代码比其他Python AMQP客户端库更简单明了,因为它们更接近AMQP协议。

谢谢你提供的sudo rabbitmqctl list_queues -p my_vhost技巧,我会尝试一下。 - Sebastian

0

我发这篇文章只是为了让其他人看到这个讨论。最得票的答案是:

# Re-declare the queue with passive flag
res = channel.queue_declare(
        callback=on_callback,
        queue="test",
        durable=True,
        exclusive=False,
        auto_delete=False,
        passive=True
    )

这对我非常有帮助,但是它有一个严重的警告。根据pika文档,passive标志用于“仅检查队列是否存在”。因此,在可能出现队列未声明的情况下,可以使用queue_declare函数和passive标志来检查队列是否存在。经过我的测试,如果您使用passive标志调用此函数并且队列不存在,则api不仅会抛出异常;它还会导致代理断开您的通道,因此即使您优雅地捕获异常,也已经失去了与代理的连接。我使用两个不同的Python脚本针对在minikube中运行的普通vanilla RabbitMQ容器进行了测试。我进行了多次测试,每次都得到相同的结果。

我的测试代码:

import logging
import pika

logging.basicConfig(level="INFO")
logger = logging.getLogger(__name__)
logging.getLogger("pika").setLevel(logging.WARNING)


def on_callback(msg):
    logger.info(f"Callback msg: {msg}")


queue_name = "testy"

credentials = pika.PlainCredentials("guest", "guest")

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host="localhost", port=5672, credentials=credentials)
)

logger.info("Connection established")

channel = connection.channel()

logger.info("Channel created")

channel.exchange_declare(exchange="svc-exchange", exchange_type="direct", durable=True)

response = channel.queue_declare(
    queue=queue_name, durable=True, exclusive=False, auto_delete=False, passive=True
)

logger.info(f"queue_declare response: {response}")

channel.queue_delete(queue=queue_name)

connection.close()

输出:
INFO:__main__:Connection established
INFO:__main__:Channel created
WARNING:pika.channel:Received remote Channel.Close (404): "NOT_FOUND - no queue 'testy' in vhost '/'" on <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x1047e2700> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>
Traceback (most recent call last):
  File "check_queue_len.py", line 29, in <module>
    response = channel.queue_declare(
  File "/Users/dbailey/dev/asc-service-deployment/venv/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2521, in queue_declare
    self._flush_output(declare_ok_result.is_ready)
  File "/Users/dbailey/dev/asc-service-deployment/venv/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1354, in _flush_output
    raise self._closing_reason  # pylint: disable=E0702
pika.exceptions.ChannelClosedByBroker: (404, "NOT_FOUND - no queue 'testy' in vhost '/'")

当我将 passive 设置为 False 时:

scripts % python check_queue_len.py
INFO:__main__:Connection established
INFO:__main__:Channel created
INFO:__main__:queue_declare response: <METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=testy'])>"])>

如果我在这里漏掉了什么,请告诉我。


0

虽然我来晚了,但这是一个使用pyrabbitpyrabbit2从AWS AmazonMQ获取队列计数的示例,可以在RabbitMQ上工作,并支持HTTPS:

from pyrabbit2.api import Client

cl = Client('b-xxxxxx.mq.ap-southeast-1.amazonaws.com', 'user', 'password', scheme='https')
if not cl.is_alive():
    raise Exception("Failed to connect to rabbitmq")

for i in cl.get_all_vhosts():
    print(i['name'])

queues = [q['name'] for q in cl.get_queues('/')]
print(queues)    

itemCount = cl.get_queue_depth('/', 'event.stream.my-api')
print(itemCount)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接