如何为多个主题配置多个JmsListener

15

在我的项目中,我添加了两个不同的JmsListener,但是当我在ActiveMQ面板中运行项目时,只有其中一个主题有消费者!

所以,我应该为每个JmsListener添加单独的jmsListenerContainerFactory配置吗?

@JmsListener(destination = "foo1")
public void foo1(final Message jsonMessage) throws JMSException {
    ...
}

@JmsListener(destination = "foo2")
public void foo12(final Message jsonMessage) throws JMSException {
    ...
}

编辑:这是来自JMS配置文件的内容:


@Configuration
@EnableJms
public class FooJmsConfig {

    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(BROKER_URL);
        connectionFactory.setPassword(BROKER_USERNAME);
        connectionFactory.setUserName(BROKER_PASSWORD);
        connectionFactory.setUseCompression(true);
        connectionFactory.setClientID("FPP_API");
        connectionFactory.setConnectionIDPrefix("DRR");
        connectionFactory.setUseAsyncSend(true);
        return connectionFactory;
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrency("1-1");
        factory.setPubSubDomain(true);
        factory.setSubscriptionDurable(true);
        return factory;
    }
}

这是被支持的,你的代码片段是正确的,因此你的问题出在其他地方,你需要提供更多信息。 - KeatsPeeks
@KeatsPeeks,支持什么?所有的jmsListeners都使用同一个jmsListenerContainerFactory吗? - mohsenJsh
3个回答

14

我发现通过将 setClientID() 方法从 ActiveMQConnectionFactory 提供者方法移动到 DefaultJmsListenerContainerFactory 提供者方法中,

我可以只拥有一个全局的 ActiveMQConnectionFactory 提供者方法,并为每个 jmsListener 拥有多个 DefaultJmsListenerContainerFactory 提供者方法:

因此最终的工作代码如下:

JMSConfig 文件:

@Configuration
@EnableJms
public class FooJmsConfig {

@Bean
public ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    connectionFactory.setBrokerURL(BROKER_URL);
    connectionFactory.setPassword(BROKER_USERNAME);
    connectionFactory.setUserName(BROKER_PASSWORD);
    connectionFactory.setUseCompression(true);

    connectionFactory.setConnectionIDPrefix("DRR");
    connectionFactory.setUseAsyncSend(true);
    return connectionFactory;
}

@Bean(name= "foo1")
public DefaultJmsListenerContainerFactory foo1() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setConcurrency("1-1");
    factory.setPubSubDomain(true);
    factory.setSubscriptionDurable(true);

    connectionFactory.setClientID("FOO_1");
    return factory;
}

 @Bean(name= "foo2")
public DefaultJmsListenerContainerFactory foo2() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setConcurrency("1-1");
    factory.setPubSubDomain(true);
    factory.setSubscriptionDurable(true);

    connectionFactory.setClientID("FOO_1");
    return factory;
}

}

并且 JMS 监听器将会被启用。

@JmsListener(destination = "foo1", containerFactory="foo1")
public void foo1(final Message jsonMessage) throws JMSException {
...
}

@JmsListener(destination = "foo2", containerFactory="foo2")
public void foo12(final Message jsonMessage) throws JMSException {
...
}

1
对于 foo2(),您设置了 setClientID("FOO_1");,我猜这是一个打字错误? - Piro

7
我认为问题在于您将两个监听器都使用了并发度为1JmsListenerContainerFactory。如果需要,您可以创建多个JmsListenerContainerFactory,并通过指定JmsListener#containerFactory属性为JmsListener设置它们。请注意保留HTML标记。

0

您可以建立多个JMSFactory连接。默认的JMS监听器只接受单例connectionfactory对象。因此,您需要设置两个jmslistenercontainerfactory本身的bean。

连接到各个主题的代码如下:

@Configuration
@EnableJms
@EnableTransactionManagement
public class JMSConnectionConfig{

private static final String AMQP_URI_FORMAT = "amqps://%s?amqp.idleTimeout=%d";

private int idleTimeout;

private String hostURL;

@Bean(name = "cachingConnectionFactory1")
@Primary
public ConnectionFactory myConnectionFactory1() {
    // set up connection details to the topic
    String remoteUri = String.format(AMQP_URI_FORMAT, hostURL, idleTimeout);
    JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory();
    jmsConnectionFactory.setRemoteURI(remoteUri);
    jmsConnectionFactory.setClientID(clientId);
    jmsConnectionFactory.setUsername(topic1SASName);
    jmsConnectionFactory.setPassword(topic1SASKey);
    // create caching factory object and return as connectionfactory parent object
    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
    cachingConnectionFactory.setTargetConnectionFactory(jmsConnectionFactory);
    cachingConnectionFactory.setReconnectOnException(true);
    return cachingConnectionFactory;
}

@Bean(name = "factory1")
public JmsListenerContainerFactory<?> factory1(@Qualifier("cachingConnectionFactory1") ConnectionFactory connectionFactory,
                                                DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory topicFactory = new DefaultJmsListenerContainerFactory();
    topicFactory.setConnectionFactory(connectionFactory);
    topicFactory.setSubscriptionDurable(Boolean.TRUE);
    // configure DefaultJmsListenerContainerFactoryConfigurer with caching factory and listener factory
    configurer.configure(topicFactory, connectionFactory);
    return topicFactory;
}

// Goes here for another topic in the same way as above two methods to establish initial connection to the topic
}

JMS监听器消息接收类的代码将如下所示,您可以将目标名称提供为您的主题名称,对于工厂,您可以提供我们设置的连接工厂的bean:

@JmsListener(destination = "${topic.name}", containerFactory = "factory1",
        subscription = "${topic.subscription.name}")
public void receiveMessage(JmsTextMessage jmsTextMessage) throws JMSException, IOException, InterruptedException {
    // listener code goes here
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接