如何阅读RabbitMQ未确认的消息 / RabbitMQ循环

3
我想读取RabbitMQ队列中未确认消息的负载或messageId。这是可能的吗?
我之所以要这样做,是因为我尝试使用RabbitMQ死信功能来构建一个周期性地自动生成消息的循环。简而言之,创建两个队列 - 工作队列和延迟队列。
1. 将延迟队列中的消息的TTL设置为需要周期性地生成的时间频率。可以针对不同的任务目的使用不同的TTL来发送不同的消息。
2. 向延迟队列中放入一条消息。当消息过期时,它将被重新发布到工作队列中。该消息可以在工作队列中停留任意长的时间,直到有消费者准备好消费它。
3. 一个消费者接收消息并处理它。如果处理成功,消费者需要确认工作队列,然后将消息写回延迟队列;如果处理失败(例如线程崩溃),则不进行确认。然后该消息会自动重新出现在工作者队列中。此时,另一个消费者就可以接手这项工作。当回传到延迟队列的消息再次过期时,它将被重新发布,并再次被消费者消费……这样就构建了一个周期循环,实现了工作负载分配。
我想确保在这个循环中没有遗漏或重复的消息,因为我不想错过任务或同时做两次任务。然而,重复的消息可能会发生。下面显示消费者首先将消息写回延迟队列,然后确认工作队列。如果线程在下面两行之间崩溃,该消息将位于延迟队列中,并且Rabbit会将消息再次发布到工作队列中。这可能会导致循环中出现重复的消息。
  channel.basicPublish(DELAY_EXCHANGE, "", null, message.getBytes());
  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

为了避免上述问题,我想在上面两行代码后添加一段狗看守逻辑:
  1. 检查循环中消息的总数(两个队列中的消息总数),看是否等于我的预期数字(我预期的数字要少10个);

  2. 如果数量不匹配,我希望找出哪些消息是缺失的或者重复的,并进行处理。我并不关心这些消息的顺序,或者频率已经被干扰了,因为这是一个非常极端的情况需要考虑。我可以轻松地获取那些准备好的消息并重新排队,但问题是如何处理那些未被确认的消息?

非常感谢您的帮助!
Roy
1个回答

2

无法从其他上下文读取未确认的消息,原始消息已被消耗并保留为未确认状态。


非常感谢您的回答。我在我的原始问题中添加了更多细节。您能否请看一下?再次感谢! - Roy Wu
1
关于每条消息的TTL,请注意,无论每条消息的TTL如何,它们只有在到达队列头部时才会被移出队列(然后如果设置了DLX,则会死信)。 - pinepain
1
同时,基于消息计数的想法可能无法奏效,因为很难预测多个消费者和发布者的行为。只需构建具有重复容错流程即可。如需进一步讨论,建议在RabbitMQ官方用户组上提出此问题(首先搜索类似的问题)。 - pinepain
@PankajNimgade 如果您没有提供任何信息,那么很难帮助您。如果您是RabbitMQ的新手,则RabbitMQ官方网站文档上有很多文档,如果您有一些具体的问题可以在这里或RabbitMQ官方用户组中发帖说明。顺便说一句,在SO上这样的评论有点离题。 - pinepain
@zaq178miami,我明白你的意思,我会尝试在群里问这个问题。我之前也曾在这里提问过,链接是stackoverflow.com/questions/26996141/error-in-loading-send-and-recv-in-rabbtimq - Pankaj Nimgade
显示剩余2条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接