如何创建一个Spring Boot消费者来消费ActiveMQ队列?

5
我正在学习ActiveMQ,并且已经制作了一个简单的Spring Boot生产者+消费者应用程序(在本问题中称其为 App1 ),该应用程序与本地ActiveMQ实例通信,一切都按预期工作。
现在,我正在尝试运行另一个Spring Boot应用程序(在同一台计算机上,但在确保 App1 未运行之后),该应用程序仅具有消费者(没有生产者)。但是,当我启动此应用程序时,在队列中放置的消息(使用修改后的 App1 ,其中我删除了应用程序的消费者部分)不会被接收。在 App1 中,一旦发布消息,消费者就会打印出系统输出语句,但在这个仅有消费者的应用程序中则不然。以下是我的监听器组件类:
package com.demo.listener;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @JmsListener(destination = "testqueue")
    public void consume(String message) {

        System.out.println("Picked up message: " + message);

    }
}

我需要做什么修改才能实现所需的行为?

App1 application.properties 文件:

spring.activemq.in-memory=false
spring.activemq.pool.enabled=false
server.port=9000
activemq.broker-url=tcp://localhost:61616
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration, org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration
security.basic.enabled=false
management.security.enabled=false

App1 JmsConfig类

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class JmsConfig {

    @Value("${activemq.broker-url}")
    private String brokerUrl;

    @Bean
    public Queue queue() {
        return new ActiveMQQueue("testqueue");
    }

    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory() {

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL(brokerUrl);
        return factory;
    }

    @Bean
    public JmsTemplate jmsTemplate() {
        return  new JmsTemplate(activeMQConnectionFactory());
    }

}

App1 生产者类

import javax.jms.Queue;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/rest/publish")
public class ProducerResource {

    @Autowired
    JmsTemplate jmsTemplate;

    @Autowired
    Queue queue;

    @GetMapping("/{message}")
    public String publishMessage(@PathVariable("message") final String message) {

        jmsTemplate.convertAndSend(queue, message);

        return "Published successfully";
    }

}

App1 消费者类是我在仅消费者应用程序中使用的相同类(如上所述)。


如果可能的话,请同时发布两个应用程序的application.properties文件。还需要listener配置类代码。您提供的信息太少,无法找出根本原因。 - Amith Kumar
@AmithKumar,我已经添加了App1属性文件。对于第二个应用程序(仅消费者应用程序),那是我唯一与JMS相关的类,并且属性文件中没有MQ/JMS设置。 - ITWorker
如果您使用Spring-Boot,对于像您这样的简单用例,您不需要配置bean。 - рüффп
请注意,activemq的url属性应该以spring为前缀。像这样:spring.activemq.broker-url=tcp://192.168.1.210:9876 - рüффп
1个回答

5

针对您的消费者应用,您需要添加池连接工厂和JMS消息监听器工厂,以便为您的消费者JMStemplate开始接收消息。

@Configuration
@EnableJms
public class ConsumerConfig {

  @Value("${activemqbrokerurl}")
  private String brokerUrl;

  @Bean
  public ActiveMQConnectionFactory activeMQConnectionFactory() {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setBrokerURL(brokerUrl);
    return activeMQConnectionFactory;
  }

  @Bean
  public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setConnectionFactory(activeMQConnectionFactory());
    factory.setConcurrency("{#setDesiredConcurrency}");
    return factory;
  }
}

Spring的MessageListenerContainer应该用于消息消费。这提供了MDB的所有功能——高效的JMS消费和消息监听器池,但不需要完整的EJB容器。

您可以使用ActiveMQ池org.apache.activemq.pool.PooledConnectionFactory来有效地池化消费者集合的连接和会话,或者您可以使用Spring JMS org.springframework.jms.connection.CachingConnectionFactory来达到相同的效果。您可以在这里阅读更多信息。

@JmsListener注解中添加containerFactory = "jmsListenerContainerFactory"参数后,这个问题得到了解决。 - ITWorker
要在Spring Boot中启用ActiveMQ池化,您无需进行额外的配置。只需添加属性和pom依赖项org.messaginghub:pooled-jms即可。官方文档在此处。所有bean都已默认定义。如果您需要从默认的Spring Boot配置派生,请使用配置Bean。 - рüффп

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接