从RabbitMQ获取消息而不消费它们

3

我想在不消费它们的情况下获取Rabbitmq队列中存在的所有消息的副本。这是否可能? 谢谢。


3个回答

阿里云服务器只需要99元/年,新老用户同享,点击查看详情
5
我想获取Rabbitmq队列中的消息副本,但不想将它们消费掉。这可行吗? 答案:不行。您最接近的选项是消费或获取一条消息,然后通过负面确认拒绝它。

注意: RabbitMQ团队监控 rabbitmq-users 邮件列表,有时会在StackOverflow上回答问题。


2
也许您可以注册一个消费者(如官方文档中所示here),而不需要向代理服务器发送确认信息no_ack=True。将no_ack设置为True即可。
channel.basic_consume(callback, queue='hello', no_ack=True)
这样,您的消费者可以接收到消息内容,但是经纪人没有将消息标记为已发送(并且在您的消费者退出时返回Ready状态)。 也许这不是最干净的方法来完成您需要的工作,但它有效且简单。 另一种(但相似的)方法是采用所谓的pull API(与您注册订阅者时使用的push API相反);我在一个.Net应用程序中使用了这种方法:您可以在此处找到.Net文档,我认为Python APIs在这方面也是类似的。 关键思想是在不给出确认的情况下获取消息:channel.BasicGet(queueName, noAck) 我希望这能帮助您朝着完整可靠的解决方案迈进!

这种方法存在问题,因为它将消息放置在“未确认”状态,直到消费者死亡。 - Luke Bakken
“no_ack”参数现在改为“auto_ack”,具有相反的逻辑,并在“basic_consume()”中默认设置为False。 - xCovelus

2

我发现一种更好的方法可以使用channel.basic_get()函数获取队列上的所有消息,如以下代码所示:

    def __init__(self):
        self.host = ConfigTools().get_attr('host')
        self.waiting_queue = ConfigTools().get_attr('test_queue_name')

    def view_queue(self) -> list:
        """Reads everything from the queue, then disconnects, causing the server to requeue the messages
        Returning the delivery tag is pointless at this point because the server makes the tag (an integer) up on
        the fly and the order can shuffle constantly"""
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=self.host))

        msgs = []
        while True:
            chl = connection.channel()
            method_frame, header_frame, body = chl.basic_get(queue='test')
            if method_frame:
                print("body : ", body)
                msgs.append(body)
            else:
                print("No more messages returned")
                connection.close()
                break
        return msgs

那么,如果我随时知道要从队列中弹出哪个消息,可以使用类似以下的方法:

    def remove(self, item) -> list:
        """Removes the item from the queue. Goes through the entire queue, similar to view_queue, and acknowledges
        the msg in the list that matches, and returns the msg.
        If item matches more than one message on the queue, only one is deleted
        """

        if isinstance(item, list):
            if not (isinstance(i, bytes) for i in item):
                print("Item must be a list of only byte objects")
        if not isinstance(item, bytes):
            print("Item must be a singe bytes object")
            raise TypeError

        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=self.host))

        msgs = []
        while True:
            chl = connection.channel()
            method_frame, header_frame, body = chl.basic_get(queue='test')
            if method_frame:
                print('body: ', body)
                if body == item:
                    print("item found!")
                    msgs.append(body)
                    chl.basic_ack(method_frame.delivery_tag)
                    connection.close()
                    return msgs

            else:
                print("Message not found")
                connection.close()
                break
        return msgs

注意:我将其用于小型应用程序——队列上不超过50条消息。我无法保证该函数在大型应用程序中的表现。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,