在ASP.NET Core应用程序中设置RabbitMQ消费者

33

我有一个ASP.NET Core应用程序,我想消费RabbitMQ消息。

我已经成功地在命令行应用程序中设置了发布者和消费者,但我不确定如何在Web应用程序中正确设置它。

我考虑在Startup.cs中初始化它,但当然一旦启动完成就会死掉。

如何从Web应用程序以正确的方式初始化消费者?


1
你确定ASP.NET是托管RabbitMQ消费者的正确位置吗?你能否有一个命令行应用程序来消费RabbitMQ消息,并在接收到它们后发布到ASP.NET? - Michael
我不确定,但这可能是最方便的,因为我们目前没有部署和运行其他类型应用程序的体系。我想一个Windows服务会起到作用,但如果有一种安全可靠的方法可以从我们的Web应用程序中执行它就更好了。 - severin
值得一提的是,所讨论的Web应用程序已经使用Hangfire执行相关的后台作业,因此将其放置在这里似乎是一个合乎逻辑的地方。 - severin
Hangfire!!! 请查看 将 HangFire 与 ASP.NET Core 集成。我认为这可能是一个很好的起点,因为你同时拥有 HangFire 和 ASP.NET Core。 - Michael
3
Hangfire不是关于长寿命对象的。开销巨大。 - Ilya Chumakov
2
我发现使用请求/响应主机(Web服务器)来托管长时间运行的事件消费者存在许多问题。在重新启动时保持稳定数量的消费者,IIS关闭进程等都会增加额外的复杂性,而且Web服务器根本没有为这种用例设计。我看到Hangfire也支持在Windows服务中托管,这样您就可以获得Hangfire和适当主机的好处。 - Vanlightly
4个回答

42

在应用程序运行期间,使用Singleton模式来保留消费者/监听器。使用IApplicationLifetime接口来在应用程序启动/停止时启动/停止消费者。

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddSingleton<RabbitListener>();
    }


    public void Configure(IApplicationBuilder app)
    {
        app.UseRabbitListener();
    }
}

public static class ApplicationBuilderExtentions
{
    //the simplest way to store a single long-living object, just for example.
    private static RabbitListener _listener { get; set; }

    public static IApplicationBuilder UseRabbitListener(this IApplicationBuilder app)
    {
        _listener = app.ApplicationServices.GetService<RabbitListener>();

        var lifetime = app.ApplicationServices.GetService<IApplicationLifetime>();

        lifetime.ApplicationStarted.Register(OnStarted);

        //press Ctrl+C to reproduce if your app runs in Kestrel as a console app
        lifetime.ApplicationStopping.Register(OnStopping);

        return app;
    }

    private static void OnStarted()
    {
        _listener.Register();
    }

    private static void OnStopping()
    {
        _listener.Deregister();    
    }
}
  • 你应该注意你的应用程序托管在哪里。例如,IIS 可能会回收并停止你的代码运行。
  • 这种模式可以扩展到一组监听器。

1
RabbitListener 是什么? - Prageeth godage
1
@Prageeth 这只是从队列中获取消息的代码。您自己的实现将取决于系统要求和队列定义。您可以在网络上找到很多示例,其中之一是 https://github.com/plwestaxiom/RabbitMQ/blob/master/RabbitMQ_Tutorials/RabbitShared/Receiver.cs#L28 - Ilya Chumakov
consumer.Received += (model, ea) => 这个仅在应用启动时被调用一次且不会继续监听。我不知道我是否遗漏了什么,Ilya。 - Prageeth godage
1
我错过了这一行 Console.ReadLine(); 如果我加上它,它就可以完美地工作。 - Prageeth godage
@IlyaChumakov,您能否提供使用RabbitListener的完整示例? - JustDontKnow
显示剩余5条评论

17

这是我的监听器:

public class RabbitListener
{
    ConnectionFactory factory { get; set; }
    IConnection connection { get; set; }
    IModel channel { get; set; }

    public void Register()
    {
        channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body);
            int m = 0;
        };
        channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
    }

    public void Deregister()
    {
        this.connection.Close();
    }

    public RabbitListener()
    {
        this.factory = new ConnectionFactory() { HostName = "localhost" };
        this.connection = factory.CreateConnection();
        this.channel = connection.CreateModel();


    }
}

5
另一种选择是托管服务
您可以创建一个HostedService,并调用一个方法来注册RabbitMq监听器。
public interface IConsumerService
{
    Task ReadMessgaes();
}

public class ConsumerService : IConsumerService, IDisposable
{
    private readonly IModel _model;
    private readonly IConnection _connection;
    public ConsumerService(IRabbitMqService rabbitMqService)
    {
        _connection = rabbitMqService.CreateChannel();
        _model = _connection.CreateModel();
        _model.QueueDeclare(_queueName, durable: true, exclusive: false, autoDelete: false);
        _model.ExchangeDeclare("your.exchange.name", ExchangeType.Fanout, durable: true, autoDelete: false);
        _model.QueueBind(_queueName, "your.exchange.name", string.Empty);
    }
    const string _queueName = "your.queue.name";
    public async Task ReadMessgaes()
    {
        var consumer = new AsyncEventingBasicConsumer(_model);
        consumer.Received += async (ch, ea) =>
        {
            var body = ea.Body.ToArray();
            var text = System.Text.Encoding.UTF8.GetString(body);
            Console.WriteLine(text);
            await Task.CompletedTask;
            _model.BasicAck(ea.DeliveryTag, false);
        };
        _model.BasicConsume(_queueName, false, consumer);
        await Task.CompletedTask;
    }

    public void Dispose()
    {
        if (_model.IsOpen)
            _model.Close();
        if (_connection.IsOpen)
            _connection.Close();
    }
}

RabbitMqService:

public interface IRabbitMqService
{
    IConnection CreateChannel();
}

public class RabbitMqService : IRabbitMqService
{
    private readonly RabbitMqConfiguration _configuration;
    public RabbitMqService(IOptions<RabbitMqConfiguration> options)
    {
        _configuration = options.Value;
    }
    public IConnection CreateChannel()
    {
        ConnectionFactory connection = new ConnectionFactory()
        {
            UserName = _configuration.Username,
            Password = _configuration.Password,
            HostName = _configuration.HostName
        };
        connection.DispatchConsumersAsync = true;
        var channel = connection.CreateConnection();
        return channel;
    }
}

最后,创建一个HostedService并调用ReadMessages方法来注册:

public class ConsumerHostedService : BackgroundService
{
    private readonly IConsumerService _consumerService;

    public ConsumerHostedService(IConsumerService consumerService)
    {
        _consumerService = consumerService;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await _consumerService.ReadMessgaes();
    }
}

注册服务:

services.AddSingleton<IRabbitMqService, RabbitMqService>();
services.AddSingleton<IConsumerService, ConsumerService>();
services.AddHostedService<ConsumerHostedService>();

在这种情况下,当应用程序停止时,您的消费者将自动停止。
附加信息:
appsettings.json:
{
  "RabbitMqConfiguration": {
    "HostName": "localhost",
    "Username": "guest",
    "Password": "guest"
  }
}

RabbitMq配置

public class RabbitMqConfiguration
{
    public string HostName { get; set; }
    public string Username { get; set; }
    public string Password { get; set; }
}

代码不起作用,显示DI错误。 - undefined
1
@sina_Islam 请分享您的错误信息 - undefined
即使消费者也无法消费信息。 - undefined
1
@sina_Islam 我不知道你如何实现rabbitmq。更多信息,请查看这个链接 - undefined
1
谢谢兄弟,它正在运作。实际上,我正在将它作为一个插件在nopCommerce中实现,以消费订单下单事件,但由于缺少一些配置,所以它没有正常工作。你的代码完美无缺,按预期运行。 - undefined

0

我发现最好的方法之一是使用BackgroundService

   public class TempConsumer : BackgroundService
{
    private readonly ConnectionFactory _factory;
    private IConnection _connection;
    private IModel _channel;

    public TempConsumer()
    {
        _factory = new ConnectionFactory()
        {
            HostName = "localhost",
            UserName = "guest",
            Password = "password",
            VirtualHost = "/",
        };
        _connection = _factory.CreateConnection() ;
        _channel = _connection.CreateModel();
        _channel.QueueDeclare(queue: "queue",
                                durable: false,
                                exclusive: false,
                                autoDelete: false,
                                arguments: null);
    }
  
    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        stoppingToken.ThrowIfCancellationRequested();
        
        var consumer = new EventingBasicConsumer(_channel);

        consumer.Shutdown += OnConsumerShutdown;
        consumer.Registered += OnConsumerRegistered;
        consumer.Unregistered += OnConsumerUnregistered;
        consumer.ConsumerCancelled += OnConsumerConsumerCancelled;


        consumer.Received += (model, ea) =>
        {
            Console.WriteLine("Recieved");
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body.ToArray());
            _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            Console.WriteLine(message);
        };


        _channel.BasicConsume(queue: "queue",
                             autoAck: false,
                             consumer: consumer);

        return Task.CompletedTask;
    }

    private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e) { }
    private void OnConsumerUnregistered(object sender, ConsumerEventArgs e) { }
    private void OnConsumerRegistered(object sender, ConsumerEventArgs e) { }
    private void OnConsumerShutdown(object sender, ShutdownEventArgs e) { }
    private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) { }

然后,将消费者注册为托管服务 services.AddHostedService<EmailConsumer>();


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接