RabbitMQ消费者:AlreadyClosedException

5

我有一个简单的RabbitMQ发布者和消费者代码,如下所示。

首先,在My_Tasks队列中创建了10个不同的消息计数。 当我尝试逐个获取这些消息并使用autoAck标志为false时,我可以读取第一条消息,但是无法将确认发送到RabbitMQ服务器。 我得到了下面写的错误;

发布者代码:

var qName = "My_Tasks";
using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare(qName, durable: true, false, false, null);

        var body = Encoding.UTF8.GetBytes(message);

        var prop = channel.CreateBasicProperties();
        prop.Persistent = true;

        channel.BasicPublish("", routingKey: qName, prop, body);
    }
}

消费者;

var qName = "My_Tasks";
using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare(qName, durable: true, false, false, null);
        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

        var consumer = new EventingBasicConsumer(channel);
        channel.BasicConsume(qName, autoAck: false, consumer);

        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
            channel.BasicAck(ea.DeliveryTag, multiple: false);
        };
    }
}

RabbitMQ.Client.Exceptions.AlreadyClosedException: '已关闭:AMQP操作被中断:AMQP关闭原因,由应用程序发起,code=200,text='Goodbye',classId=0,methodId=0'
在RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)发生异常 在RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemory 1 body)内部进行了传输操作 在RabbitMQ.Client.Framing.Impl.Model.BasicAck(UInt64 deliveryTag, Boolean multiple)中进行了消息确认操作 在RabbitMQ.Client.Impl.RecoveryAwareModel.BasicAck(UInt64 deliveryTag, Boolean multiple)中进行了消息确认操作 在RabbitMQ.Client.Impl.AutorecoveringModel.BasicAck(UInt64 deliveryTag, Boolean multiple)中进行了消息确认操作 在QueueExample.Consumer.Program.<>c__DisplayClass0_0.b__0(Object model, BasicDeliverEventArgs ea)中发生异常,在D:\Projects\RabbitMQTutorial\QueueExample\QueueExample.Consumer\Program.cs的第36行 在RabbitMQ.Client.Events.EventingBasicConsumer.HandleBasicDeliver(String consumerTag, UInt64 deliveryTag, Boolean redelivered, String exchange, String routingKey, IBasicProperties properties, ReadOnlyMemory`1 body)执行消息传递操作 在RabbitMQ.Client.Impl.ConcurrentConsumerDispatcher.<>c__DisplayClass10_0.b__0()中进行了异步操作
感谢您的帮助。
2个回答

2
我找到了解决错误的方法。我的示例应用程序是一个控制台应用程序,因此在消费者接收消息时,连接已经关闭了。当我写下 Connsole.ReadLine() 等待响应以关闭连接时,消费者就可以读取所有消息。
这是临时解决方案,您可以从这个角度找到自己的解决方案。
var qName = "My_Tasks";
using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.QueueDeclare(qName, durable: true, false, false, null);
        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

        var consumer = new EventingBasicConsumer(channel);
        channel.BasicConsume(qName, autoAck: false, consumer);

        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
            channel.BasicAck(ea.DeliveryTag, multiple: false);
        };

        //Solution is here
        Console.WriteLine("Press Any Key to Continue..");
        Console.ReadLine();
    }
}

1

这应该是一条注释,但是我缺少声望。

你的解决方案已经工作了吗?你在哪里定义与代理的连接?你是否在docker-compose配置中使用代理?如果是这种情况,你的代理必须连接到

rabbitmq://host.docker.internal

当我开始在 .Net Core 中使用 RabbitMQ 时,我遇到了同样的问题。由于我的应用程序(微服务模式)处于早期阶段,我转而使用 MassTransit framework 处理所有事情。我强烈推荐它,因为它“只需运行”,您可以完全专注于所需的系统功能。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接