如何在RabbitMQ中重新排队消息。

45

消费者接收到消息后,进行一些验证操作,然后调用Web服务。在这个阶段,如果出现任何错误或验证失败,我们希望将消息放回最初消费的队列中。

我已经阅读了RabbitMQ文档。但我对拒绝(reject)、否定(nack)和取消(cancel)方法之间的区别感到困惑。

1个回答

75

简短回答:

要重新排队特定的消息,您可以选择同时设置multiple标志为false的basic.rejectbasic.nack

如果您正在使用消息确认,并且在消费者上有未确认的消息,并且消费者在特定时间退出而不确认它们,那么basic.consume调用也可能导致消息被重新传递。

basic.recover将在特定通道上重新发送所有未确认的消息。

详细回答:

basic.rejectbasic.nack都用于同一目的 —— 丢弃或重新排队不能由特定消费者处理的消息(在给定时刻、在某些条件下或根本无法处理)。它们之间的主要区别是basic.nack支持批量消息处理,而basic.reject不支持。

这个差异在官方RabbitMQ网站上的Negative Acknowledgements文章中有描述:
AMQP规范定义了basic.reject方法,允许客户端拒绝单个已传递的消息,并指示代理将其丢弃或重新排队。不幸的是,basic.reject不支持批量否认消息。
为解决这个问题,RabbitMQ支持basic.nack方法,提供了basic.reject的所有功能,同时还允许批量处理消息。
要批量拒绝消息,客户端将basic.nack方法的multiple标志设置为true。然后,代理将拒绝所有未确认的、已传递的消息,直到并包括在basic.nack方法的delivery_tag字段中指定的消息。在这方面,basic.nack补充了basic.ack的批量确认语义。

请注意,basic.nack方法是RabbitMQ特定的扩展,而basic.reject方法是AMQP 0.9.1规范的一部分。

至于basic.cancel方法,它用于通知服务器客户端停止消息消费。请注意,客户端可能会在发送basic.cancel方法并接收到cancel-ok回复之间接收任意数量的消息。如果客户端使用消息确认,并且有任何未确认的消息,则这些消息将被移回到最初从中消耗它们的队列中。

basic.recover在RabbitMQ中有一些限制: - basic.recover with requeue=false - basic.recover synchronicity

除了勘误之外,根据RabbitMQ规范 basic.recover有部分支持(不支持重新排队恢复)。

关于basic.consume的注意事项:

When basic.consume is started without auto-ack (no­ack=false) and there are pending non-acked messages, if the consumer is canceled (due to death, fatal error, exception, etc.), the pending messages will be redelivered. These pending messages will not be processed (even dead-lettered) until the consumer releases them (ack/nack/reject/recover). Only after they have been released will they be processed (e.g., dead-lettered).
For example, let's say we originally posted 5 messages in a row:
Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)

然后消费其中3个,但不确认它们,然后取消消费者。我们将面临这种情况:

Queue(main) (tail) { [4] [3] [2*] [1*] [0*] } (head)

星号(*)表示 redelivered 标志设置为 true

假设我们有一个设置了死信交换机和死信消息队列的情况。

Exchange(e-main)                                   Exchange(e-dead) 
  Queue(main){x-dead-letter-exchange: "e-dead"}       Queue(dead) 

假设我们发布了5条消息,并将expire属性设置为5000(即5秒):

Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)
Queue(dead) (tail) { }(head)

然后我们从 main 队列中消费了3条消息并将它们保留10秒:

Queue(main) (tail) { [2!] [1!] [0!] } (head)
Queue(dead) (tail) { [4*] [3*] } (head)

感叹号 (!) 代表未确认的消息。这些消息无法传递给任何消费者,通常也无法在管理面板中查看。但是如果取消消费者,请记住它仍然保留了3个未确认的消息:

Queue(main) (tail) { } (head)
Queue(dead) (tail) { [2*] [1*] [0*] [4*] [3*] } (head)

现在,之前在头部的三个消息已经被放回到原始队列中,但由于它们都设置了每条消息的生存时间(TTL),它们被转移到死信队列的尾部(当然是通过死信交换机)。

P.S.:

消费消息,也就是监听新消息,与直接访问队列(获取一个或多个消息而不必关心其他消息)有所不同。更多信息请参见basic.get方法的描述。


basic.nack 是 RabbitMQ 特有的扩展。 - pinepain
“RabbitMQ-specific extension”指的是除扩展之外的部分都没有使用吗?也就是说,如果我使用Spring AMQP与RabbitMQ集成,那么我不能使用basic.nack吗? - Glide
我的意思是basic.nack不是AMQP标准的一部分,而是RabbitMQ特定的扩展(我已经在回答中添加了注释,以便未来的读者能够看到它)。如果你使用RabbitMQ作为AMQP代理,你可以使用basic.nack代替basic.reject - pinepain
basic.consume 的调用也可能导致消息重新投递,如果您正在使用消息确认并且在特定时间消费者上存在未确认的消息。这个说法正确吗?据我所知,如果没有收到 ACK,消息将等待直到达到 TTL。 - NeoWang
1
@NeoWang,是的,我可能是指如果basic.consume在没有自动确认(no-ack=false)的情况下启动,并且有一些未确认的挂起消息,则当消费者被取消(死亡、致命错误、异常等)时,这些挂起的消息将被重新传递。从技术上讲,这些挂起的消息不会被处理(甚至不会被转到死信队列),直到消费者释放它们(确认/否认/拒绝/恢复)。只有在此之后,它们才会被处理(例如,转到死信队列)。但感谢您指出原始答案中的这种模棱两可之处,我将添加此说明和示例。 - pinepain
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接