如何在RabbitMQ中设置重试尝试次数?

99

我正在使用RabbitMQ,并且有一个保存电子邮件消息的队列。我的消费者服务出列消息并尝试发送它们。如果由于任何原因,我的消费者无法发送消息,我希望重新将消息排队以再次发送。

我知道我可以执行basicNack并将requeue标志设置为true,但是,如果我们的电子邮件系统出现问题,我不想无限期地重新排队未发送的消息。我想定义一个有限次数的消息重排队次数,以便再次发送。

我无法在电子邮件消息对象上设置字段,但是当我出列并发送nack时。更新后的字段在队列中的消息上不存在。

是否有其他方法可以解决这个问题?


1
这篇博客文章很好地解释了为什么你想在.NET中使用类似NServiceBus或MassTransit的框架来解决这些问题,以便在RabbitMQ上进行操作:https://www.make-awesome.com/2017/12/sure-you-can-just-use-rabbitmq/ - Udi Dahan
@UdiDahan 在Java世界中是否有类似于MassTransit/NServiceBus的替代方案? - Datz
1
@Datz 我觉得最接近的东西应该是Apache Caml。 - Udi Dahan
@Datz - 如果仍然相关,请参考JAVA:https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit - Roee Gavirel
5个回答

109

基于quorum队列的毒消息处理方式的2023年更新:

Quorum队列跟踪不成功的投递尝试次数,并在任何重新投递的消息中包含" x-delivery-count "头来公开它。

他们还添加了支持限制重新投递的策略:

可以使用策略参数delivery-limit为队列设置投递限制。

因此,鉴于这些新的添加(和经典队列的弃用),下面的原始答案可能不再适用于最新的RabbitMQ版本。

原始答案(在添加队列和流队列之前):

RabbitMQ(以及AMQP协议)中没有像重试尝试这样的功能。

实现重试尝试限制行为的可能解决方案:

  1. 如果消息之前没有被重新发送(检查basic.deliver方法的redelivered参数-您的库应该有一些接口来处理此操作),请重新发送消息,然后将其丢弃并在死信交换机中捕获,然后进行某些处理。

  2. 每次无法处理消息时,请再次发布消息,但设置或增加/减少标题字段,例如x-redelivered-count(您可以选择任何名称)。在这种情况下,为了控制重新传递,您必须检查所设置的字段是否达到某个限制(顶部或底部-0是我的选择,类似于tcp / ip中的ttl)。

  3. 将消息唯一键(例如uuid,但必须在发布消息时手动设置)存储在Redis、memcache或其他存储中,甚至在mysql中,同时还要存储重新传递计数,然后在每次重新传递时增加/减少此值,直到达到限制。

  4. (对于真正的极客)编写插件以实现所需的行为。

#3的优点是重新投递的消息会留在队列头部。如果您有一个长的队列或者消息顺序对您很重要,这一点非常重要(请注意,重新投递会打破严格的消息顺序,请参阅官方文档或SO上的此问题了解详细信息)。

P.S.:

在这个话题中有相似的答案,但是是关于php的。仔细阅读一下,或许可以帮助你一点(从"There are multiple techniques to deal with cycle redeliver problem"开始阅读)。

非常感谢。我想我会选择死信交换机路线。 - user2689570
6
如果您不介意限制消息的生命周期而不是限制重试次数,可以设置TTL。 - Víctor Romero
目前似乎无法编写插件。Rabbit中没有行为可以让您钩入消息消费并修改消息。您只能在发布期间使用rabbit_channel_interceptor修改消息头,但这对此处没有帮助。 - fafl

27

虽然这是一个老问题,但我认为现在可以通过死信交换和在消息死信化后添加的x-death头部数组的组合来轻松实现:

死信化过程会向每个死信消息的标头中添加名为 x-death 的数组。该数组包含由 {queue, reason} 对标识的每个死信事件的条目。每个此类条目都是一个表格,由几个字段组成:

queue:消息死信化之前所在的队列名称

reason:死信化原因,详见下方

time:消息死信化的日期和时间,作为 64 位 AMQP 0-9-1 时间戳

exchange:消息发布到的交换机(注意,如果消息被死信化多次,则此处将是死信交换机)

routing-keys:消息发布时使用的路由键(包括 CC 键,但不包括 BCC 键)

count:此消息在此队列中由于此原因被死信化的次数

original-expiration(如果消息由于每条消息的 TTL 而被死信化):消息的原始过期属性。为了防止消息在路由到任何队列时再次过期,在消息死信化时会将过期属性从消息中删除。

请阅读这篇精彩的文章以获取更多信息

查看这个图表:

输入图像描述


也许我理解错了,但是标记为“失败(超过最大重试次数)”的分支不应该标记为“ack”吗?文章似乎在“选项2:拒绝+DLX拓扑”的示例下暗示了这一点,我认为这是指的。如果在超过最大计数时执行nack操作,那么所有消息将永远发送到DLX,对吗? - ollien
1
@ollien nack 可用于重新排队消息或丢弃(如果有死信),第三个参数 requeue 控制此行为。https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.nack 。我更倾向于在这里使用 nack 而不是 ack,因为从语义上讲,它更相关,表示消费失败并且消息未被正确处理。 - Mohammed Essehemy
是的,但是当你NACK它时,它会进入DLX,对吧?换句话说,如果在重试次数达到后再NACK,它将永远无法跳出重试循环。 - ollien

9

您应该使用 Quorum 队列类型。该队列类型具有一个传递限制参数,用于指定在删除消息之前尝试传递消息的次数。

创建队列时,请指定以下参数:

x-queue-type: quorum
x-delivery-limit: 3 // it means rabbitmq will make 3 attempts to deliver a message before deleting it 

1

在尝试了多种解决方案之后(包括其他答案中的解决方案,受到Erez Rabih帖子的启发),我想出了一种使用DLX和Delayed Exchange的机制。并写了this post来进行解释。


需求

为了实现完美的重试机制,您需要:

  1. 重新排队在原始队列尾部(不要锁定它)
  2. 能够限制重试次数(不要无限重试)
  3. 能够等待一段时间后再重试(提高重试成功率)
  4. (额外要求:能够控制重试之间的延迟)

一个语义化、简单而又强大的解决方案

这是我在各种项目中想出来的:

enter image description here

它使用了以下方法:

  • 工作队列上的简单、语义化的Dead Letter Exchange设置(只需nack消息而不重新排队)
  • 一个带有简单消费者的'error-queue',计算下一个重试延迟
  • Delayed Exchange 'wait-exchange', 其中'error-queue' 使用自定义x-delay进行publish
  • 绑定到'wait-exchange'的'requeue-queue',其中消费者负责将消息重新排队到原始队列

您可以阅读this post,了解更多信息。


0
从我的角度来看,这里更好的想法是在消费者内部实现死信交换和重试逻辑的组合。如果消费者无法处理消息,则将消息放入DeadLetterQueue。
下面您可以找到使用node-amqp和Rabbitmq实现的死信交换原型https://github.com/kharandziuk/dead-letter-exchange-prototype

简单来说,如果您打算缩短重试延迟并减少总尝试次数,则此方法可能最有效。但对于大型应用程序而言,消费者的 RAM 中等待重试的消息会增加 RAM 的使用量(或者如果您限制并发处理的消息数量以防止过多的 RAM 使用,则吞吐量会降低)。 - Alexander Taylor

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接