如何从非本连接自己的通道中恢复未确认的AMQP消息?

53

似乎我保持我的rabbitmq服务器运行得越久,未确认的消息就会越多。我希望重新排队它们。实际上似乎有一个amqp命令可以做到这一点,但它只适用于您的连接所使用的通道。我构建了一个小的pika脚本来尝试它,但我要么遗漏了什么,要么不能通过这种方式完成(那 rabbitmqctl 呢?)

import pika

credentials = pika.PlainCredentials('***', '***')
parameters = pika.ConnectionParameters(host='localhost',port=5672,\
    credentials=credentials, virtual_host='***')

def handle_delivery(body):
    """Called when we receive a message from RabbitMQ"""
    print body

def on_connected(connection):
    """Called when we are fully connected to RabbitMQ"""
    connection.channel(on_channel_open)    

def on_channel_open(new_channel):
    """Called when our channel has opened"""
    global channel
    channel = new_channel
    channel.basic_recover(callback=handle_delivery,requeue=True)    

try:
    connection = pika.SelectConnection(parameters=parameters,\
        on_open_callback=on_connected)    

    # Loop so we can communicate with RabbitMQ
    connection.ioloop.start()
except KeyboardInterrupt:
    # Gracefully close the connection
    connection.close()
    # Loop until we're fully closed, will stop on its own
    connection.ioloop.start()

你已经解决了这个问题吗? - 13hsoj
这个stackoverflow的答案可能会有你需要的内容,具体取决于为什么你还有其他通道挂着未确认的消息。这些是僵尸通道。这不是重复问题,因为这个话题是关于其他通道中的消息,而不是通道本身。 - Gerard ONeill
3个回答

82

未被确认的消息是已经传递到消费者的网络消息,但尚未被ack'ed或rejected - 但该消费者尚未关闭最初接收它们的通道或连接。因此,代理无法确定消费者只是花费更长时间来处理这些消息,还是已忘记它们。因此,它将它们保留在未确认状态,直到消费者死亡或它们被ack'ed或rejected。

由于这些消息仍然可能由最初消费它们的仍存活的消费者在未来有效地处理,因此您不能(据我所知)将另一个消费者插入其中并尝试对它们进行外部决策。您需要修复消费者,在处理每个消息时作出决策,而不是让旧消息未被确认。


1
基本上,消费者必须调用basic.recover吗?我正在使用celeryd来管理连接。如果您熟悉celeryctl,可能可以将该恢复命令发送到响应不佳的队列。 - Will Olbrys
4
我对你使用 Celery 表示慰问。Celery 开发者对 AMQP 消息队列协议的理解存在问题,他们创建了一个严重有缺陷的实现。你需要做出选择:要么放弃 Celery,正确使用 AMQP;要么停止使用 Celery 中的 AMQP,改用像 Redis 这样简单的东西。我选择放弃 Celery,继续使用 AMQP。 - Michael Dillon
6
这是一个相当严重的控诉。您不介意我问一下,Celery的AMQP实现有什么没有正确执行的地方吗? - Will Olbrys
8
我同意,我想听听你为什么认为芹菜是如此糟糕的。它被广泛使用,而这是我第一次听到这样的抱怨。 - Jeremy Dunck

28

如果消息未确认,只有两种方法可以将它们放回队列:

  1. basic.nack

    这个命令将导致消息被放回队列并重新投递。

  2. 与代理断开连接

    这个操作将强制将此通道中所有未确认的消息放回队列。

注意:basic.recover会尝试在同一通道上(对同一消费者)重新发布未确认的消息,这有时是期望的行为。

RabbitMQ规范basic.recover和basic.nack


真正的问题是:为什么消息未确认?

导致消息未确认的可能情况:

  1. 消费者获取太多的消息,然后没有及时处理和确认它们。

    解决方案:预取适当数量的消息。

  2. 存在错误的客户端库(我目前在使用pika 0.9.13时遇到了这个问题)。 如果队列中有很多消息,一定数量的消息将会卡在未确认状态,甚至数小时后都无法消除未确认状态。

    解决方案:我必须多次重启消费者,直到队列中的所有未确认消息都被清除。


你的pika问题已经报告了吗?你能提供一个链接吗? - istepaniuk
这是Python递归限制的触发。大概是由于pika 0.9.13递归了超过1000次,而导致了这个问题。但在0.9.14版本中没有出现这个问题。 - IvanD
3
最终找到了问题报告的位置:https://github.com/pika/pika/issues/286 - IvanD
新版本中有此设置,以避免消费者超时。请参考:https://www.rabbitmq.com/consumers.html#acknowledgement-timeout - Rakesh Sharma
另一个可能的情况是:在RabbitMQ中发送消息和我的计算机确认它们之间,因为互联网连接出现故障。 - nsandersen

6

一旦所有的工作者/消费者停止工作,所有未确认的消息都会进入就绪状态。

请通过在 ps aux 输出上使用 grep 并查找并停止/杀死他们来确保所有工作者都已停止。

如果您正在使用 supervisor 管理工作者,并且显示工作者已停止,您可能需要检查僵尸进程。Supervisor 报告工作者已停止,但是当在 ps aux 输出中搜索时仍然会发现僵尸进程运行。杀死僵尸进程将使消息回到就绪状态。


您还可以使用RabbitMQ管理控制台来确定兔子连接是否被僵尸进程阻塞,就像我在这里描述的那样:https://dev59.com/rGct5IYBdhLWcg3wpfBc#43026774 - Shlomi Uziel

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接