清空RabbitMQ中的队列

6
我在日常交易中使用RabbitMQ。我的消费者是部署在多台机器上的.NET桌面应用程序。每天只有在一定时间范围内才会将交易推送到队列中。超出此时间需要强制停止任何新交易。我已成功停止向队列发送新交易。但是,队列中现有的交易也需要被清除,以便不发送给任何消费者。

我尝试搜索这个问题,但没有找到除了两个选项之外的解决方法-

  • 每天删除并重新创建队列
  • 停止队列的所有消费者

这两种方法都可以实现,但需要对我的系统进行重大改变。我想知道是否有更好的方法。


你尝试过使用这个功能吗:https://www.rabbitmq.com/ttl.html - Andrew Skirrow
谢谢,我会去查看这个的。 - Souvik Ghosh
rabbitmqadmin 中有一个功能可以清空队列,rabbitmqadmin purge queue name=queue_name。虽然这不是您代码库中的高级操作,但这意味着您无需删除队列或停止消费者。 - Pär Eriksson
@PärEriksson 是的,我检查过了。我需要以某种方式用C#完成这个。 - Souvik Ghosh
很不幸,没有API可以清除队列。但是,rabbitmqadmin可以使用一些命令。我已经找到了一些方法,并正在进行测试。如果成功的话,我会在这里发布。 - Souvik Ghosh
显示剩余2条评论
7个回答

13

在C#中,您可以通过以下方式清空队列:

            ConnectionFactory factory = new ConnectionFactory();

            factory.HostName = "localhost";
            factory.UserName = "guest";
            factory.Password = "guest";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.QueuePurge(queueName);
                }

            }

3
此篇博客文章介绍了在RabbitMQ中不同的方法来清空队列。 rabbitmqadmin: 管理插件附带了一个命令行工具rabbitmqadmin,它可以执行与基于Web的用户界面(RabbitMQ管理界面)相同的操作。
用于清空单个队列中所有消息的脚本为:
$ rabbitmqadmin purge queue name=name_of_queue

HTTP API: RabbitMQ管理插件提供了一个基于HTTP的API来管理和监控您的RabbitMQ服务器。

curl -i -XDELETE https://USERNAME:PASSWORD@HOST/api/queues/vhost/QUEUE_NAME/contents

策略:添加一个与队列名称匹配的最大长度规则的策略。可以通过进入管理界面并点击管理员选项卡来添加策略。(应用策略后不要忘记删除它。)


抱歉,我没有RabbitMQAdmin,所以只能通过代码来完成。 - Souvik Ghosh

1
另一个选择是使用下面的代码和HareDu 2 Broker API。文档在这里: https://github.com/ahives/HareDu2
var result = _container.Resolve<IBrokerObjectFactory>()
                .Object<Queue>()
                .Empty(x =>
                {
                    x.Queue("your_queue");
                    x.Targeting(t => t.VirtualHost("your_vhost"));
                });

0
如果你想清空非零队列,可以使用以下命令:
sudo rabbitmqctl list_queues| awk '{if($2 > 0) print $1;}'| xargs -t -n1 sudo rabbitmqctl purge_queue 当然,你也可以自定义命令来清空所有队列。

0
如果必须选择这两种方法之一,并且如您在评论中所说,需要以某种方式使用C#,那么简单地使用HTTP API。在该页面上搜索/api/queues/vhost/name/contents,这就是您调用删除方法的位置。

这会删除队列,是吧? - Souvik Ghosh
是的。我没有粘贴正确的路径-末尾缺少“contents”。我会编辑答案。 - cantSleepNow
山旅行者也在我的答案中给出了这个信息作为回答。 - cantSleepNow

0
更具体地说,从HTTP API:Http动词DELETE在/api/queues/vhost/name/contents 引用:“队列的内容。删除以清除。请注意,您无法获取此内容。”

0
根据Andy Skirrow(在评论中)的建议,我已经在将消息发布到RabbitMQ的JSON有效载荷中设置了每条消息的“过期时间”。
我的代码-
using (System.Net.WebClient client = new System.Net.WebClient())
{
    client.Credentials = new System.Net.NetworkCredential(rmq_user, rmq_pass);
    client.Headers.Set("Content-Type", "application/json");
    response = client.UploadString(messagePath, jsonPayload);
}

这是我的有效载荷:

{"payload":"{\"PerformAutomation\":{\"AutomationInputDictionary\":{\"Search.Ban\":\"Holidays from=10th Mar 2017;Holidays to=13th Mar 2017;WalmartID=00155628;ticket_number=1226004;TicketType=HOLIDAY REQUEST;RawTicketData=PERN: 00155628\\r\\nHoliday Request -------------------- Holiday from 10th Mar 2017 to 13th Mar 2017\"},\"ProcessName\":\"HRProc\",\"ProfileName\":\"HR\",\"APIVersion\":\"\",\"AppId\":\"\",\"CommandExecutionWindow\":\"\",\"CommandGenerationSource\":\"\",\"Country\":\"\",\"Instance\":\"\",\"PartnerId\":\"\",\"ReferenceCode\":\"\",\"Timestamp\":\"5:07 AM\",\"UserName\":\"svcblpr\",\"VID\":\"\"}}","content_type":"string","content_encoding":"test/json","profile":"HR","expiration":604800000,"app_id":"wm_uc1_load_gen_app","source_message_id":"wm_uc1_load_gen_source","header":null}

所以,通过这种解决办法,我不必清除队列,因为根据载荷中设置的过期时间(毫秒值),消息将自动被删除。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接