Spring AMQP - 重复消息

3
我正在处理一个高流量的数据流,每秒大约有500个消息。这些数据通过Spring AMQP+Rabbit被消费,使用了一个带有10个并发消费者的SimpleMessageListenerContainer。我需要每15分钟对数据库进行一些检查,并重新加载某些属性以进行处理。这是通过一个quartz触发器完成的,该触发器每15分钟触发一次,停止SimplelistenerContainer,执行必要的工作,然后再次启动容器。

当应用程序启动时,一切都运行得很完美。当触发器触发并重新启动容器时,我会看到同一条消息被传递多次,这会导致很多重复。消费者没有抛出任何异常。

消息监听器

 class RoundRobinQueueListener implements MessageListener {

@Override
public void onMessage(Message message) { //do processing
     }

  }

在应用程序启动期间设置并行消费者并启动消费者。
final SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
       RoundRobinQueueListener roundRobinListener = RoundRobinQueueListener.class.newInstance();
        messageListenerContainer.setQueueNames(queueName);
        messageListenerContainer.setMessageListener(roundRobinListener);
        messageListenerContainer.setConcurrentConsumers(10);
        messageListenerContainer.setChannelTransacted(true);

石英触发器
    void execute(JobExecutionContext context) throws JobExecutionException {
    messageListenerContainer.stop()
    //Do db task, other processing
    messageListenerContainer.start()
    }

嗨winash,我们需要查看一些代码和配置来帮助解决这个问题。相关的Spring配置是一个很好的起点,以及Quartz作业代码。 - C. Ross
编辑了原问题以添加代码示例,这大致上是代码的结构。 - winash
经过一些调试,发现错误在于我的流初始化,我错误地责怪了兔子,感谢您的帮助。 - winash
1个回答

1

看起来您的消息现在已被消费者确认。如果您没有使用自动确认模式,您需要自己确认该消息(这也可以在 SimpleMessageListenerContainer中配置)。否则,代理会认为该消息未成功处理并尝试重新传递。


我已将通道设置为事务性,并且也没有显式地设置任何确认模式(默认为自动)。 看起来像是一个确认问题,但队列从未积累,即消息确实被处理了。 - winash
交易是否正确提交?在 RMQ 中启用调试日志,并检查消息是否已出列。 - mbelow
@meblow在我的原始问题下添加了一条评论,感谢您的建议。 - winash

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接