这是一个简单的问题,但谷歌或Pika开源代码都没有提供帮助。是否有一种方法可以查询当前在Pika中队列的大小(项目计数器)?
我知道这个问题有点旧了,但是这里有一个使用pika实现的例子。
关于AMQP和RabbitMQ,如果您已经声明了队列,您可以使用passive标志重新声明队列,并保持所有其他队列参数不变。对此声明的响应declare-ok将包括队列中消息的数量。
以下是使用pika 0.9.5的示例:
import pika
def on_callback(msg):
print msg
params = pika.ConnectionParameters(
host='localhost',
port=5672,
credentials=pika.credentials.PlainCredentials('guest', 'guest'),
)
# Open a connection to RabbitMQ on localhost using all default parameters
connection = pika.BlockingConnection(parameters=params)
# Open the channel
channel = connection.channel()
# Declare the queue
channel.queue_declare(
callback=on_callback,
queue="test",
durable=True,
exclusive=False,
auto_delete=False
)
# ...
# Re-declare the queue with passive flag
res = channel.queue_declare(
callback=on_callback,
queue="test",
durable=True,
exclusive=False,
auto_delete=False,
passive=True
)
print 'Messages in queue %d' % res.method.message_count
这将打印以下内容:
<Method(['frame_type=1', 'channel_number=1', "method=<Queue.DeclareOk(['queue=test', 'message_count=0', 'consumer_count=0'])>"])>
<Method(['frame_type=1', 'channel_number=1', "method=<Queue.DeclareOk(['queue=test', 'message_count=0', 'consumer_count=0'])>"])>
Messages in queue 0
你可以从message_count
成员获得消息数量。以下是使用pika获取队列长度的方法(假设您正在使用默认用户和密码在本地主机上)请将q_name替换为您的队列名称。
import pika
connection = pika.BlockingConnection()
channel = connection.channel()
q = channel.queue_declare(q_name)
q_len = q.method.message_count
我发这篇文章只是为了让其他人看到这个讨论。最得票的答案是:
# Re-declare the queue with passive flag
res = channel.queue_declare(
callback=on_callback,
queue="test",
durable=True,
exclusive=False,
auto_delete=False,
passive=True
)
这对我非常有帮助,但是它有一个严重的警告。根据pika文档,passive
标志用于“仅检查队列是否存在”。因此,在可能出现队列未声明的情况下,可以使用queue_declare函数和passive
标志来检查队列是否存在。经过我的测试,如果您使用passive
标志调用此函数并且队列不存在,则api不仅会抛出异常;它还会导致代理断开您的通道,因此即使您优雅地捕获异常,也已经失去了与代理的连接。我使用两个不同的Python脚本针对在minikube中运行的普通vanilla RabbitMQ容器进行了测试。我进行了多次测试,每次都得到相同的结果。
我的测试代码:
import logging
import pika
logging.basicConfig(level="INFO")
logger = logging.getLogger(__name__)
logging.getLogger("pika").setLevel(logging.WARNING)
def on_callback(msg):
logger.info(f"Callback msg: {msg}")
queue_name = "testy"
credentials = pika.PlainCredentials("guest", "guest")
connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost", port=5672, credentials=credentials)
)
logger.info("Connection established")
channel = connection.channel()
logger.info("Channel created")
channel.exchange_declare(exchange="svc-exchange", exchange_type="direct", durable=True)
response = channel.queue_declare(
queue=queue_name, durable=True, exclusive=False, auto_delete=False, passive=True
)
logger.info(f"queue_declare response: {response}")
channel.queue_delete(queue=queue_name)
connection.close()
INFO:__main__:Connection established
INFO:__main__:Channel created
WARNING:pika.channel:Received remote Channel.Close (404): "NOT_FOUND - no queue 'testy' in vhost '/'" on <Channel number=1 OPEN conn=<SelectConnection OPEN transport=<pika.adapters.utils.io_services_utils._AsyncPlaintextTransport object at 0x1047e2700> params=<ConnectionParameters host=localhost port=5672 virtual_host=/ ssl=False>>>
Traceback (most recent call last):
File "check_queue_len.py", line 29, in <module>
response = channel.queue_declare(
File "/Users/dbailey/dev/asc-service-deployment/venv/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2521, in queue_declare
self._flush_output(declare_ok_result.is_ready)
File "/Users/dbailey/dev/asc-service-deployment/venv/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1354, in _flush_output
raise self._closing_reason # pylint: disable=E0702
pika.exceptions.ChannelClosedByBroker: (404, "NOT_FOUND - no queue 'testy' in vhost '/'")
当我将 passive
设置为 False 时:
scripts % python check_queue_len.py
INFO:__main__:Connection established
INFO:__main__:Channel created
INFO:__main__:queue_declare response: <METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=testy'])>"])>
如果我在这里漏掉了什么,请告诉我。
虽然我来晚了,但这是一个使用pyrabbit或pyrabbit2从AWS AmazonMQ获取队列计数的示例,可以在RabbitMQ上工作,并支持HTTPS:
from pyrabbit2.api import Client
cl = Client('b-xxxxxx.mq.ap-southeast-1.amazonaws.com', 'user', 'password', scheme='https')
if not cl.is_alive():
raise Exception("Failed to connect to rabbitmq")
for i in cl.get_all_vhosts():
print(i['name'])
queues = [q['name'] for q in cl.get_queues('/')]
print(queues)
itemCount = cl.get_queue_depth('/', 'event.stream.my-api')
print(itemCount)