使用ActiveMQ创建持久化主题和订阅者的Spring Boot JMS

7
我需要创建一个ActiveMQ的主题和持久订阅者,但我不知道在哪里指定。我能够创建主题并消费消息,但当我关闭订阅者,然后继续发送消息并再次打开订阅者时,它将无法读取它们。
以下是我的进展情况:
发送消息:
    JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
    jmsTemplate.setPubSubDomain(true);
    jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
    jmsTemplate.setDeliveryPersistent(true);
    jmsTemplate.convertAndSend("venta.topic",venta);

收到消息:

@JmsListener(destination = "venta.topic",id = "comercial",subscription = "venta.topic")
public void receiveMessage(Venta venta) {
    logger.log(Level.INFO, "RECEIVED : {0}",venta);      
    repository.save(venta);
}

我已阅读了此文章,并且理解我需要创建持久性订阅者。我还阅读了Spring文档,我认为它与DefaultJmsListenerContainerFactory有关(我没有实现,而是使用默认配置),文档显示:
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory =
            new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setDestinationResolver(destinationResolver());
    factory.setConcurrency("3-10");
    return factory;
}

但是我似乎找不到创建持久会话的地方。我的生产者和订阅者都连接到一个独立的ActiveMQ二进制文件。希望你能帮助我,谢谢。
3个回答

11

如前面的答案所指出的那样,有必要在工厂中设置客户端ID和持久化订阅:

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory =
            new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setDestinationResolver(destinationResolver());
    factory.setConcurrency("3-10");
    factory.setClientID("brokerClientId");
    factory.setSubscriptionDurable(true);
    return factory;
}

但仅此还不足以将客户端注册为持久订阅者,因为需要指定 containerFactory 才能使 JMSListener 成为持久订阅者,否则它将只采用默认值:

但是仅仅做到这一步并不能使客户端成为可持续化的订阅者,这是因为 JMSListener 需要指定 containerFactory,否则它只会采用默认设置:

@JmsListener(
destination = "venta.topic",
id = "comercial",
subscription = "venta.topic",
//this was also needed with the same name as the bean above
containerFactory = "jmsListenerContainerFactory" 
)
public void receiveMessage(Venta venta) {
            logger.log(Level.INFO, "RECEIVED : {0}",venta);      
            repository.save(venta);
}
值得一提的是,这篇文章让我意识到了我的错误。
希望这能帮助其他人。

5

DefaultJmsListenerContainerFactory应具有唯一的clientId和durable sub. true设置,代码如下:

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory =
            new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setDestinationResolver(destinationResolver());
    factory.setConcurrency("3-10");
    factory.setClientID("brokerClientId");
    factory.setSubscriptionDurable(true);
    return factory;
}

1
谢谢你指导我正确的方向,但是我还是遇到了问题,看起来我需要在JMSListener上注册工厂。 - Hugo sama
正确的做法是,在你的JmsListener中注册自定义的containerFactory。 - NiranjanK
通过修改 YAML 配置文件,是否可以实现这个功能? - mohamnag

3

很难确定,但这个问题的常见原因是忘记在connectionFactory bean上配置唯一的clientId。它必须是唯一的,是代理程序区分每个客户端的方式。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接