如何检查RabbitMQ消息队列是否存在?

43

如何检查消息队列是否已经存在?

我有两个不同的应用程序,一个创建队列,另一个从队列中读取。

因此,如果我先运行从队列中读取的客户端,则会崩溃。为了避免这种情况,我想先检查队列是否存在。

这里是我读取队列的代码片段:

QueueingBasicConsumer <ConsumerName> = new QueueingBasicConsumer(<ChannelName>); 
<ChannelName>.BasicConsume("<queuename>", null, <ConsumerName>); 
BasicDeliverEventArgs e = (BasicDeliverEventArgs)<ConsumerName>.Queue.Dequeue();

这里是我读取队列的代码片段:QueueingBasicConsumer <ConsumerName> = new QueueingBasicConsumer(<ChannelName>); <ChannelName>.BasicConsume("<queuename>", null, <ConsumerName>);BasicDeliverEventArgs e = (BasicDeliverEventArgs)<ConsumerName>.Queue.Dequeue(); - Jigar Sheth
我已经将那段代码片段添加到您的帖子中。在未来,如果需要添加更多上下文,请点击“编辑”链接,而不是添加评论。有关更多信息,请参阅评论帮助页面上的“何时应该评论?”部分。 - Sᴀᴍ Onᴇᴌᴀ
8个回答

82

不用费力查看。

queue.declare 是一个幂等操作。因此,无论您运行一次、两次还是N次,结果仍然相同。

如果您想确保队列存在,请在使用之前先声明它。确保每次声明的耐久性、排他性和自动删除性相同,否则您将会得到异常。

如果您确实需要检查队列是否存在(通常情况下不需要),请对队列进行被动声明。该操作成功表示队列存在,如果不存在,则返回错误信息。


3
请问您能否提供在C# API中 passively 声明队列的语法? - Jigar Sheth
4
请使用IModel.QueueDeclare方法,并将passive参数设置为true。参考链接:http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.8.1/rabbitmq-dotnet-client-1.8.1-client-htmldoc/html/type-RabbitMQ.Client.IModel.html - scvalex
28
队列的声明可能是幂等的,但如果你不知道你尝试发布消息的队列参数(例如自动删除等),重新声明队列将会失败,因为队列设置不同。 - Dalibor Filus
4
检查队列的存在可能很有用。例如,如果你正在编写一个RPC服务,你可能希望在处理消息之前确保RPC客户端未离开。 - WGH
4
仅当用户配置了队列的权限时,这才有效。有时候写和读的用户是不同的,所以我认为这不是最佳解决方案。 - Gustavo Andrade Ferreira
显示剩余3条评论

11

将以下代码放入try catch块中。如果队列或交换机不存在,则会抛出错误。如果存在,则不会执行任何操作。

  var channel = connection.CreateModel();


  channel.ExchangeDeclarePassive(sExchangeName);

  QueueDeclareOk ok = channel.QueueDeclarePassive(sQueueName);

   if (ok.MessageCount > 0)
    {
      // Bind the queue to the exchange

     channel.QueueBind(sQueueName, sExchangeName, string.Empty);
    }

3
实际上,这是正确的答案。被选中作为正确答案的那个选项存在潜在风险,可能会在RabbitMQ服务器中声明和创建废弃队列。QueueDeclarePassive方法就是为这种情况设计的。 - Majid
谢谢 +1。值得注意的一点是,只保留通道用于此检查,因为如果检查失败,通道将变得无用。请参阅:https://github.com/streadway/amqp/issues/167。不相关的说明:通道是`IDisposable`。 - astrowalker

7

当有其他应用程序负责q声明时,这种方法将无效。而且我可能不知道q的所有参数,只知道其名称。

我更倾向于使用passiveDeclare并检查IOException以确保q不存在。


6

目前您可以通过RabbitMQ Management HTTP API获取这些信息和更多内容。

例如,要了解当前是否有一个队列正在运作,您可以调用API的/api/queues/vhost/name接口的GET方法。


API链接已失效,这个链接可能更加通用于此主题。 - watery
也许这是有效的链接:https://rawcdn.githack.com/rabbitmq/rabbitmq-management/v3.8.3/priv/www/api/index.html - Caius Jard
让一个真人登录控制台,或许给感兴趣的人打个电话或发送电子邮件?这不是一个严肃的解决方案。 - Rick O'Shea

4

Spring AMQP(Java 实现)中有一个元数据 API。

@Autowired
public RabbitAdmin rabbitAdmin;

//###############get you queue details##############
Properties properties = rabbitAdmin.getQueueProperties(queueName);

//do your custom logic
if( properties == null)
{
    createQueue(queueName);
}

1
使用QueueDeclare()来执行建议的操作。另外,我们一直在做的是,让队列的消费者成为队列的所有者,并始终发布到由发布者创建和拥有的交换机。然后,消费者将他们的队列绑定到他们希望从中接收流量的交换机,并使用适当的路由键过滤所需的流量。通过这种方式,对于非持久化队列,发布者不会被没有消费者而静音,而消费者可以自由地使用具有适当路由键映射的持久或非持久队列来进出。这样可以得到一个易于管理的系统,并允许使用Web管理创建持久队列并将其绑定到交换机,获取一些流量,解除绑定,然后检查队列内容以了解通过交换机传输的流量和负载。

我正在研究发布者声明交换机,消费者声明队列并将其绑定到交换机的分离方法。然而,如果在消费者尝试将队列绑定到未声明的交换机时,会抛出异常。您是否确保先初始化发布者,是否也在消费者中声明交换机,是否手动在UI/CLI中创建交换机,或者是否采取其他措施来防止此问题? - brenthompson2
1
我们让消费者宣布它被卡住了,然后等待交换机出现(在异常发生时循环),或者应用程序被关闭/终止。这使得系统可以以任何顺序启动,并且可以根据需要重新启动,而无需按照其启动顺序对发布者或消费者进行排序。 - Grwww

1
如其他答案中提到的,channel.QueueDeclarePassive(QueueName)可以让您在不尝试重新声明队列的情况下检查队列的存在(例如,如果更改了队列设置,否则会引发异常)。
然而,QueueDeclarePassive()似乎只检查根虚拟主机。如果您的目标队列实际上位于自定义虚拟主机中,则可能会得到错误的负面结果(未经测试)。如果在根虚拟主机中存在具有相同名称的队列,则肯定会得到错误的正面结果(已经测试)。
也许将QueueDeclare()放在try-catch中可能是更好的方法,但依我个人的看法,您将获得的RabbitMQ.Client.Exceptions.OperationInterruptedException过于宽泛,这样做并不安全。

0
在我看来,最好的方法是覆盖rabbitmq的默认配置。
主类应该禁用默认的rabbit配置:
    @SpringBootApplication
    @EnableAutoConfiguration(exclude 
  {org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration.class})
  public class MyApplication {

    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args);
    }

}

我的 rabbitmq 属性存储在 rabbitmq.properties 文件中:
...
    rabbitmq.queue=my-queue
...

然后只需创建自己的配置rabbitmq组件:

@Component
@EnableRabbit
@PropertySource("classpath:rabbitmq.properties")
public class RabbitMQConfiguration
{
...
    @Value("${rabbitmq.queue}")
    private String queueName;

...

    @Bean
    public Queue queue() {
        return new Queue(queueName, false);
    }
...

同时需要设置消费者:

@Component
@PropertySource("classpath:rabbitmq.properties")
public class MyConsumer
{
    private static Logger LOG = LogManager.getLogger(MyConsumer.class.toString());

    @RabbitListener(queues = {"${rabbitmq.queue}"})
    public void receive(@Payload Object data) {
        LOG.info("Message: " + data) ;
    }

现在当客户端启动时,如果队列不存在,它会自动创建队列。如果队列已经存在,则不执行任何操作。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接