基于事件的RabbitMQ C# API消息消费

10
while (true)
{
    BasicDeliverEventArgs e = (BasicDeliverEventArgs)Consumer.Queue.Dequeue();
    IBasicProperties properties = e.BasicProperties;
    byte[] body = e.Body;
    Console.WriteLine("Recieved Message : " + Encoding.UTF8.GetString(body));
    ch.BasicAck(e.DeliveryTag, false);
}

这是我们按订阅获取消息时所做的事情。我们使用 While 循环,因为我们希望消费者持续监听。如果我想要基于事件驱动,当新消息到达队列时,只有在那时消费者才会消费该消息,或者在任何类似的事件上。


1
你能使用EventingBasicConsumer吗? http://www.java2s.com/Open-Source/CSharp/Message/RabbitMQ/RabbitMQ/Client/Events/EventingBasicConsumer.cs.htm - maxfridbe
1
参见:https://dev59.com/xmcs5IYBdhLWcg3w5H5a - Babak
3个回答

9

使用RabbitMQ.Client.Events.EventingBasicConsumer来代替阻塞式消费者,实现事件驱动的消费。


3
请注意,EventingBasicConsumer目前被列为实验性功能。 - WhiteKnight
我在我的网站上有一个完整的示例,展示如何使用C#实现这个。http://www.jarloo.com/listening-to-rabbitmq-events/ - Kelly

7

您当前正在阻塞于 Consumer.Queue.Dequeue()。如果我正确理解了您的问题,您希望异步地消费消息。

标准的做法是编写自己的 IBasicConsumer(可能通过子类化 DefaultBasicConsumer 实现),并将其设置为通道的 消费者

这样做的问题在于,你必须非常小心地处理IBasicConsumer.HandleBasicDelivery中的操作。如果使用任何同步AMQP方法(例如basic.publish),会导致死锁。如果执行任何需要很长时间的操作,将遇到其他问题。
如果确实需要同步方法或长时间运行的操作,则你正在做的方法大致正确。查看Subscription,它是一个IBasicConsumer,可以消费消息并将其放入队列中。
如果需要更多帮助,请提问rabbitmq-discuss邮件列表。

5
不要在多个线程之间共享IModel。相较于调试并发 bug 的成本,创建新的连接/模型非常便宜。 - Carl Hörberg
根据 .net 客户端用户指南,应用程序处理程序可以调用 IModel.BasicPublish 而不会导致死锁,因为它们是异步操作。但从处理程序调用 QueueDeclare 或 BasicCancel 将导致死锁。 - maulik13

4
我遇到了这个问题,但找不到答案,所以创建了一个演示项目,使RabbitMQ订阅在收到消息时引发.NET事件。该订阅运行在自己的线程上,使UI(在我的情况下)可以自由地执行其任务。
我称之为RabbitEar,因为它监听来自强大的RabbitMQ的消息。我打算与RabbitMQ网站分享此内容,以便他们认为有价值时可以在其示例中包含链接/代码。
请访问http://rabbitears.codeplex.com/查看。
谢谢 Simon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接