RabbitMQ异步支持

32

RabbitMQ .NET客户端是否支持异步操作?我想要能够异步连接和消费消息,但是到目前为止还没有找到方法。

(对于消费消息,我可以使用EventingBasicConsumer,但这不是一个完整的解决方案。)

只是提供一些背景信息,这是我目前在使用RabbitMQ的方式(代码取自我博客):

var factory = new ConnectionFactory() { HostName = "localhost" };

using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare("testqueue", true, false, false, null);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += Consumer_Received;
        channel.BasicConsume("testqueue", true, consumer);

        Console.ReadLine();
    }
}

你能具体一点吗?在这种情况下,你所说的“异步”是什么意思?你想要实现什么? - Derick Bailey
async/await...所以我正在寻找可等待且返回任务的等效项,就像System.IO有的ConnectAsync(),ReadAsync()等。 - Gigi
4个回答

51

Rabbit支持使用AsyncEventingBasicConsumer类将消息分派到异步消息处理程序。它的工作方式类似于EventingBasicConsumer,但允许您注册一个返回Task的回调函数。回调函数被分派到并且RabbitMQ客户端会等待返回的Task

var factory = new ConnectionFactory
{
    HostName = "localhost",
    DispatchConsumersAsync = true
};

using(var connection = cf.CreateConnection())
{
    using(var channel = conn.CreateModel())
    {
        channel.QueueDeclare("testqueue", true, false, false, null);

        var consumer = new AsyncEventingBasicConsumer(model);

        consumer.Received += async (o, a) =>
        {
            Console.WriteLine("Message Get" + a.DeliveryTag);
            await Task.Yield();
        };
    }

    Console.ReadLine();
}

我无法确定这是在哪个版本中添加的,但相关的提交是在2017年2月份。 - Paul Turner
2
似乎自5.0.0-pre3版本以来就已经可用了。 - Gigi
@Gigi,我们的生产者支持异步吗? - Pingpong
@Pingpong 我不知道。 - Gigi
请查看以下链接:https://gist.github.com/kjnilsson/732c0883c7807647e84ba5be2c3027f5 - Matteo1010

13

2
这真的很荒谬,在这么多年之后,对于如此受欢迎的平台来说…… - Ahmad

4

总结当前的async/TPL支持:

  • 正如@paul-turner提到的那样,现在有一个AsyncEventingBasicConsumer,你可以为其注册事件并返回一个Task
  • 还有一个AsyncDefaultBasicConsumer,你可以重写虚拟方法,例如HandleBasicDeliver并返回Task。原始PR在这里(看起来它也是在5.0中引入的?)
  • 根据上述PR的最终评论和这个问题,看起来他们正在开发一个全新的.NET客户端,该客户端将更完全地支持async操作,但我没有看到任何具体的链接。

3

有一个 AsyncEventingBasicConsumer,它所做的就是在接收到消息时等待异步的“事件处理程序”。这是这里唯一的异步操作。通常情况下,您不会从中获得任何收益,因为只有一个“处理程序”,消息仍然是逐个处理的。它们是同步处理的!此外,由于等待是在 Consumer 中完成的,您失去了异常处理的控制。

让我猜测,通过异步消息处理,您是指某种程度的并行处理。

我最终使用的是 TPL Dataflow 的 ActionBlockActionBlock 可以运行您配置的任务数,并管理等待和并行性。由于它的操作是基于 Tasks 而不是 Threads,所以只要它们是真正异步的,就可以管理较少的资源。

  1. 常规 EventingBasicConsumer 调用 actionBlock.Post(something)
  2. 要进行并行处理,您需要告诉 RMQ 在您 ack 它们之前发送 N 条消息:model.BasicQos(0, N, true);
  3. ActionBlock 有一个选项 MaxDegreeOfParallelism 属性,也需要设置成 N。
  4. ActionBlock 运行接收之前由 Consumer 提交的数据的 async Task,这些任务不应该抛出异常,因为 ActionBlock 会在发生异常时停止所有处理。
  5. 请注意传递 CancellationToken 并正确等待 ActionBlock 完成所有正在运行的 Tasks:actionBlock.Complete(); await actionBlock.Completion;

截至今天,这是不正确的。我昨天进行了测试,并且在事件处理程序中遇到await时,事件处理程序外的代码也会被执行。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接