RabbitMQ - 清空一个队列中所有未被确认的消息

58

我在开发环境中有数千条未确认消息,但无法重启。
是否有一种方法可以删除(清除)所有消息,即使它们未被确认?

8个回答

71

关闭未确认消息所在的通道,这将使它们被nack回队列,然后调用purge。


你如何找到未确认消息所在的通道? - grayaii
6
在你的 RabbitMQ 控制台中,如果你点击带有未确认消息的队列,那么会显示一个消费者部分,在其中你会找到与之相关的通道。 - Kishan Mehta
谢谢!这对我有用:我停止了队列的消费者,然后消息移动到桶中,然后我就能用清除按钮将它们删除了。 - eby
你如何“关闭通道”? - Gerald Murphy

23

您必须让消费者 ack 它们(或 nack),只有在此之后它们才会被移除。或者,您可以关闭消费者并完全清空队列。

如果您正在寻找一种方式来清除所有未确认的消息,则AMQP协议和RabbitMQ都没有这样的功能。

看起来您的消费者是问题的原因,因此您必须对其进行调整(重写),以便在处理或失败后立即释放消息。


AMQP 中有队列清除功能:http://www.rabbitmq.com/amqp-0-9-1-reference.html#queue.purge - old_sound
6
不会清除未确认的消息。从queue.purge方法文档块中可以看到:此方法从队列中删除所有未等待确认的消息,这是严格的AMQP协议实现。 - pinepain
@pinepain 我花了一些时间编写代码来查找带有消息的队列。然后调用purge命令,再在循环中等待,但是队列从未清空。感谢您的澄清。我需要重新考虑我的方法。 - Trevor Boyd Smith

3

如果队列中没有“准备就绪”的消息,删除并重新创建。


2

使用此方法将会丢失队列内容。

在清除消息之前,您需要将其放回队列中:

  • 关闭通道
  • 关闭连接(该脚本对我不起作用)

作为替代方案,这不需要等待:

  • 删除并重新创建队列
  • 重启服务器

13
然后被解雇。 - Aniket Inge

1
你需要调用 basic.recover 方法,强制所有未确认的消息重新进入失败的通道。请注意有关此函数的勘误,指定 RabbitMQ 仅支持重新排队模式。

1

出现这种情况的原因之一是由于处理错误,导致消费者被卡在重复处理相同的消息中。在这种情况下,RabbitMQ队列管理界面可能会将这些消息显示为未确认,但实际上它们正在从队列中读取并处理(到故障点),然后以快速的速度重新排队(以便进行重试) -- 也许每秒数千次

在此循环期间,这些消息短暂地处于就绪状态,但立即被您的应用程序删除 -- 然后循环开始。例如,Spring AMQP默认的自动重新排队行为就是这种示例。

由于这些消息永远不会保留在就绪状态下,因此管理界面的获取消息按钮不太可能起作用。如果您可以访问队列,则可以运行一个单独的自定义消费者实例,可能是本地的,但具有特定的目的,即删除并不重新排队所涉及的消息。

通过 RabbitMQ 的 公平调度 机制,你的额外消费者很可能会收到相关的消息并有机会执行你自定义的处理逻辑。
你甚至可以编写一个自定义实用程序,用于过滤、分析或死信感兴趣的消息。

0

针对软件开发人员,请使用以下代码。

channel.purgeQueue(queue-name);

如果我们使用这段代码,队列将被清空并存在相同的队列。

-3

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接