在.NET Core中注册RabbitMQ消费者?

6
我正在尝试使用消息队列(RabbitMQ)处理微服务架构中的请求授权。我已经按照这些说明在.NET Core控制台应用程序中成功配置了接收器和发送器。但是,在实际应用示例中,我的接收项目未作为消费者收集消息。我认为我必须在Startup.cs中注册消费者,但我似乎无法使其工作。我的消费者/响应者代码:
public class RabbitMqHandler 
{
    private readonly IJWTFactory _jwtFactory;

    public RabbitMqHandler(IJWTFactory jWTFactory)
    {
        _jwtFactory = jWTFactory;

    }

    public void Register()
    {
        var mqFactory = new ConnectionFactory() { HostName = "localhost" };

        using (var connection = mqFactory.CreateConnection())
        {
            Console.WriteLine("Listening on Rabbit MQ");
            using (var channel = connection.CreateModel())
            {
                channel.QueueDeclare(queue: "Authorize", durable: false, exclusive: false, autoDelete: false, arguments: null);

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var jwtToken = Encoding.UTF8.GetString(body);

                    Console.WriteLine("Rceived Message");

                    var validatedToken = _jwtFactory.ValidateTokenSignature(jwtToken);

                    SendResponse(validatedToken);
                };
                channel.BasicConsume(queue: "Authorize", autoAck: true, consumer: consumer);
            }
        }
    }

    public void Deregister()
    {

    }

Startup.cs 用于注册 .AddSingleton()

编辑:我已经添加了一些额外的监听代码,这肯定是在启动时运行的,但是RabbitMQ没有将应用程序显示为消费者或通道:

public static class ApplicationBuilderExtentions
{
    public static RabbitMqHandler Listener { get; set; }

    public static IApplicationBuilder UseRabbitListener(this IApplicationBuilder app)
    {
        Listener = app.ApplicationServices.GetService<RabbitMqHandler>();

        var life = app.ApplicationServices.GetService<IApplicationLifetime>();

        life.ApplicationStarted.Register(OnStarted);

        //press Ctrl+C to reproduce if your app runs in Kestrel as a console app
        life.ApplicationStopping.Register(OnStopping);

        return app;
    }

    private static void OnStarted()
    {
        Listener.Register();
    }

    private static void OnStopping()
    {
        Listener.Deregister();
    }
}

总结一下:

  • 如何在 .NET Core 中正确配置消费者以消费消息?
  • 这只是期望消息队列管理请求/响应式通信的错误方法吗?
  • 我应该仅使用 API 调用来验证和授权用户吗?

1
那么到底是什么不起作用? consumer.Received 从未触发吗? - Evk
@Evk 似乎是这样,在我的服务器上我也看不到消费者。所以我甚至不确定 channel.basicConsume 是否起作用。 - Dandy
2
我看到问题了 - 你在消费开始后立即关闭了通道和连接。移除所有的using关键字,将connectionchannel移到RabbitMqHandler类的字段中,并在Unregister而不是在Register中关闭它们。 - Evk
@Evk 现在正在尝试!只是出于好奇,我在代码的哪个位置关闭了通道?是在 BasicConsume 还是因为 using - Dandy
1
好的,“using”是设计用来处理资源的,当你到达using块的结尾时,它会调用“Dispose”。 “BasicConsume”不是阻塞调用,因此它开始消耗并立即返回。紧接着,对于“channel”和“connection”,都到达了using块的结尾,将它们处理掉(处理掉与关闭相同)。 - Evk
@Evk 现在它的工作就像魔法一样。由于某种原因,我的大脑没有考虑到“使用”会关闭连接,尽管这是非常明显的事实。非常感谢你。 - Dandy
1个回答

1

以下是@Evk(在评论中)提供的答案:

using用于处理对象的释放,当代码块结束时,它会调用Dispose方法。 BasicConsume不是一个阻塞调用,因此它启动消费并立即返回。
接着,对于通道和连接,using代码块的结束将触发它们的处理(处理它们与关闭它们相同)。”

我想补充一下:

如果您想迅速尝试删除using是否能达到所需结果,则可以轻松更改以下代码行:

using (var connection = mqFactory.CreateConnection())

给:

var connection = mqFactory.CreateConnection();

这将立即解决问题。但请注意,它也会移除适当的处理方式 - 所以您需要添加 - 这里 是来自 Microsoft 的一篇文章,描述了如何正确实现 IDisposable

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接