如何让Spring RabbitMQ创建一个新的队列?

26

根据我(有限的)使用rabbit-mq的经验,如果你为一个尚不存在的队列创建新的监听器,该队列会自动创建。我正在尝试使用Spring AMQP项目和rabbit-mq来设置一个监听器,但是却收到了错误提示。以下是我的xml配置:

<rabbit:connection-factory id="rabbitConnectionFactory" host="172.16.45.1" username="test" password="password" />

<rabbit:listener-container connection-factory="rabbitConnectionFactory"  >
    <rabbit:listener ref="testQueueListener" queue-names="test" />
</rabbit:listener-container>

<bean id="testQueueListener" class="com.levelsbeyond.rabbit.TestQueueListener"> 
</bean>

我在RabbitMq日志中看到了以下内容:

=ERROR REPORT==== 3-May-2013::23:17:24 ===
connection <0.1652.0>, channel 1 - soft error:
{amqp_error,not_found,"no queue 'test' in vhost '/'",'queue.declare'}

AMQP 也会出现类似的错误:

2013-05-03 23:17:24,059 ERROR [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer] (SimpleAsyncTaskExecutor-1) - Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.

从堆栈跟踪中看,队列是以“被动”模式创建的-有谁能指出我如何创建队列而不使用被动模式,以便我不会看到这个错误?或者我还遗漏了其他什么吗?

5个回答

27

虽然这是一个较旧的帖子,但在谷歌上仍然排名很高,因此以下是一些更新的信息:

2015-11-23

Spring 4.2.x与Spring-Messaging,Spring-Amqp 1.4.5.RELEASESpring-Rabbit 1.4.5.RELEASE以来,通过@Configuration类和一些注释声明交换、队列和绑定变得非常简单:

@EnableRabbit
@Configuration
@PropertySources({
    @PropertySource("classpath:rabbitMq.properties")
})
public class RabbitMqConfig {    
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqConfig.class);

    @Value("${rabbitmq.host}")
    private String host;

    @Value("${rabbitmq.port:5672}")
    private int port;

    @Value("${rabbitmq.username}")
    private String username;

    @Value("${rabbitmq.password}")
    private String password;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);

        logger.info("Creating connection factory with: " + username + "@" + host + ":" + port);

        return connectionFactory;
    }

    /**
     * Required for executing adminstration functions against an AMQP Broker
     */
    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    /**
     * This queue will be declared. This means it will be created if it does not exist. Once declared, you can do something
     * like the following:
     * 
     * @RabbitListener(queues = "#{@myDurableQueue}")
     * @Transactional
     * public void handleMyDurableQueueMessage(CustomDurableDto myMessage) {
     *    // Anything you want! This can also return a non-void which will queue it back in to the queue attached to @RabbitListener
     * }
     */
    @Bean
    public Queue myDurableQueue() {
        // This queue has the following properties:
        // name: my_durable
        // durable: true
        // exclusive: false
        // auto_delete: false
        return new Queue("my_durable", true, false, false);
    }

    /**
     * The following is a complete declaration of an exchange, a queue and a exchange-queue binding
     */
    @Bean
    public TopicExchange emailExchange() {
        return new TopicExchange("email", true, false);
    }

    @Bean
    public Queue inboundEmailQueue() {
        return new Queue("email_inbound", true, false, false);
    }

    @Bean
    public Binding inboundEmailExchangeBinding() {
        // Important part is the routing key -- this is just an example
        return BindingBuilder.bind(inboundEmailQueue()).to(emailExchange()).with("from.*");
    }
}

一些可供参考的资源和文档:

  1. Spring注解
  2. 声明/配置RabbitMQ以支持队列/绑定
  3. 直接交换绑定(用于路由键不重要的情况)

注意:看起来我遗漏了一个版本--从Spring AMQP 1.5开始,您可以在监听器中声明完整的绑定,这使得事情变得更加容易!


你可以创建Bean,并通过方法调用再次创建对象。 - user2636594

11

似乎解决我的问题的是添加管理员。这是我的xml:

<rabbit:listener-container connection-factory="rabbitConnectionFactory"  >
    <rabbit:listener ref="orderQueueListener" queues="test.order" />
</rabbit:listener-container>

<rabbit:queue name="test.order"></rabbit:queue>

<rabbit:admin id="amqpAdmin" connection-factory="rabbitConnectionFactory"/>

<bean id="orderQueueListener" class="com.levelsbeyond.rabbit.OrderQueueListener">   
</bean>

3
是的,您需要在配置中添加一个 RabbitAdmin 来自动声明队列、交换机和绑定。 - Gary Russell
2
此外,只有在向队列发送消息之后,该队列才会被声明。 - Rori Stumpf
2
它在Java注释模式下也可以工作:添加一个自动装配的AmqpAdmin并使用专用方法声明队列时,如果不存在,则会自动创建队列! - Ing. Luca Stucchi
通过使用rabbit:listener元素,如何让Rabbit声明一个随机命名的队列以与扇形交换机一起使用? - Fopedush

9

Spring Boot 2.1.6Spring AMQP 2.1.7开始,你可以使用以下代码在启动时创建队列(如果它们不存在):

@Component
public class QueueConfig {

    private AmqpAdmin amqpAdmin;

    public QueueConfig(AmqpAdmin amqpAdmin) {
        this.amqpAdmin = amqpAdmin;
    }

    @PostConstruct
    public void createQueues() {
        amqpAdmin.declareQueue(new Queue("queue_one", true));
        amqpAdmin.declareQueue(new Queue("queue_two", true));
    }
}

4

你能在连接标签后面,但监听器之前添加这个吗:

<rabbit:queue name="test" auto-delete="true" durable="false" passive="false" />

很不幸,根据XSD模式,上述被列出的被动属性是无效的。然而,在我所见过的每个queue_declare实现中,被动参数都是有效的queue_declare参数。我很好奇是否会起作用,或者他们计划在未来支持它。

这里是队列声明的完整选项列表: http://www.rabbitmq.com/amqp-0-9-1-reference.html#class.queue

这是spring rabbit schema的完整XSD(包括注释): http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd


很不幸,我在启动时遇到了一个错误,即队列元素中不允许使用“被动”属性。奇怪的是,“声明”方法接受被动参数,但我似乎无法在xml中定义它。 - eric
抱歉,我可能有点迟钝,我不知道自己哪里出了问题。我明白你的意思是根据 xsd 它不是有效的 xml,我可以忽略 eclipse 给我的错误提示 - 但实际上我也得到了一个运行时错误。 - eric
没关系,我的中间名是dense。 :-) 我认为我们在说同样的事情。在我看来,如果它是一个完整的rabbit接口,上述被动参数应该存在。Spring RabbitMQ缺少它。我会向他们提交一个错误报告,并在您的错误报告中链接回这个问题。他们可能知道我们都错过了什么,或者这可能只是一个合理的遗漏。 - Homer6

0
如果之前您使用的是spring-rabbit版本<1.6,现在升级到该版本或之后,并且发现您的队列没有被创建,那么很可能您缺少了一个RabbitAdmin bean。以前的版本似乎不需要在上下文中使用它,但1.6及其之后的版本需要。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接