我正在处理一个高流量的数据流,每秒大约有500个消息。这些数据通过Spring AMQP+Rabbit被消费,使用了一个带有10个并发消费者的SimpleMessageListenerContainer。我需要每15分钟对数据库进行一些检查,并重新加载某些属性以进行处理。这是通过一个quartz触发器完成的,该触发器每15分钟触发一次,停止SimplelistenerContainer,执行必要的工作,然后再次启动容器。
在应用程序启动期间设置并行消费者并启动消费者。
石英触发器
当应用程序启动时,一切都运行得很完美。当触发器触发并重新启动容器时,我会看到同一条消息被传递多次,这会导致很多重复。消费者没有抛出任何异常。
消息监听器
class RoundRobinQueueListener implements MessageListener {
@Override
public void onMessage(Message message) { //do processing
}
}
在应用程序启动期间设置并行消费者并启动消费者。
final SimpleMessageListenerContainer messageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
RoundRobinQueueListener roundRobinListener = RoundRobinQueueListener.class.newInstance();
messageListenerContainer.setQueueNames(queueName);
messageListenerContainer.setMessageListener(roundRobinListener);
messageListenerContainer.setConcurrentConsumers(10);
messageListenerContainer.setChannelTransacted(true);
石英触发器
void execute(JobExecutionContext context) throws JobExecutionException {
messageListenerContainer.stop()
//Do db task, other processing
messageListenerContainer.start()
}