我想读取RabbitMQ队列中未确认消息的负载或messageId。这是可能的吗?
我之所以要这样做,是因为我尝试使用RabbitMQ死信功能来构建一个周期性地自动生成消息的循环。简而言之,创建两个队列 - 工作队列和延迟队列。
1. 将延迟队列中的消息的TTL设置为需要周期性地生成的时间频率。可以针对不同的任务目的使用不同的TTL来发送不同的消息。
2. 向延迟队列中放入一条消息。当消息过期时,它将被重新发布到工作队列中。该消息可以在工作队列中停留任意长的时间,直到有消费者准备好消费它。
3. 一个消费者接收消息并处理它。如果处理成功,消费者需要确认工作队列,然后将消息写回延迟队列;如果处理失败(例如线程崩溃),则不进行确认。然后该消息会自动重新出现在工作者队列中。此时,另一个消费者就可以接手这项工作。当回传到延迟队列的消息再次过期时,它将被重新发布,并再次被消费者消费……这样就构建了一个周期循环,实现了工作负载分配。
我想确保在这个循环中没有遗漏或重复的消息,因为我不想错过任务或同时做两次任务。然而,重复的消息可能会发生。下面显示消费者首先将消息写回延迟队列,然后确认工作队列。如果线程在下面两行之间崩溃,该消息将位于延迟队列中,并且Rabbit会将消息再次发布到工作队列中。这可能会导致循环中出现重复的消息。
为了避免上述问题,我想在上面两行代码后添加一段狗看守逻辑:
Roy
我之所以要这样做,是因为我尝试使用RabbitMQ死信功能来构建一个周期性地自动生成消息的循环。简而言之,创建两个队列 - 工作队列和延迟队列。
1. 将延迟队列中的消息的TTL设置为需要周期性地生成的时间频率。可以针对不同的任务目的使用不同的TTL来发送不同的消息。
2. 向延迟队列中放入一条消息。当消息过期时,它将被重新发布到工作队列中。该消息可以在工作队列中停留任意长的时间,直到有消费者准备好消费它。
3. 一个消费者接收消息并处理它。如果处理成功,消费者需要确认工作队列,然后将消息写回延迟队列;如果处理失败(例如线程崩溃),则不进行确认。然后该消息会自动重新出现在工作者队列中。此时,另一个消费者就可以接手这项工作。当回传到延迟队列的消息再次过期时,它将被重新发布,并再次被消费者消费……这样就构建了一个周期循环,实现了工作负载分配。
我想确保在这个循环中没有遗漏或重复的消息,因为我不想错过任务或同时做两次任务。然而,重复的消息可能会发生。下面显示消费者首先将消息写回延迟队列,然后确认工作队列。如果线程在下面两行之间崩溃,该消息将位于延迟队列中,并且Rabbit会将消息再次发布到工作队列中。这可能会导致循环中出现重复的消息。
channel.basicPublish(DELAY_EXCHANGE, "", null, message.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
为了避免上述问题,我想在上面两行代码后添加一段狗看守逻辑:
检查循环中消息的总数(两个队列中的消息总数),看是否等于我的预期数字(我预期的数字要少10个);
如果数量不匹配,我希望找出哪些消息是缺失的或者重复的,并进行处理。我并不关心这些消息的顺序,或者频率已经被干扰了,因为这是一个非常极端的情况需要考虑。我可以轻松地获取那些准备好的消息并重新排队,但问题是如何处理那些未被确认的消息?
Roy