我有一个简单的RabbitMQ发布者和消费者代码,如下所示。
首先,在My_Tasks队列中创建了10个不同的消息计数。 当我尝试逐个获取这些消息并使用autoAck标志为false时,我可以读取第一条消息,但是无法将确认发送到RabbitMQ服务器。 我得到了下面写的错误;
发布者代码:
var qName = "My_Tasks";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(qName, durable: true, false, false, null);
var body = Encoding.UTF8.GetBytes(message);
var prop = channel.CreateBasicProperties();
prop.Persistent = true;
channel.BasicPublish("", routingKey: qName, prop, body);
}
}
消费者;
var qName = "My_Tasks";
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(qName, durable: true, false, false, null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(qName, autoAck: false, consumer);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
channel.BasicAck(ea.DeliveryTag, multiple: false);
};
}
}
RabbitMQ.Client.Exceptions.AlreadyClosedException: '已关闭:AMQP操作被中断:AMQP关闭原因,由应用程序发起,code=200,text='Goodbye',classId=0,methodId=0'
在RabbitMQ.Client.Impl.SessionBase.Transmit(Command cmd)发生异常 在RabbitMQ.Client.Impl.ModelBase.ModelSend(MethodBase method, ContentHeaderBase header, ReadOnlyMemory 1 body)内部进行了传输操作 在RabbitMQ.Client.Framing.Impl.Model.BasicAck(UInt64 deliveryTag, Boolean multiple)中进行了消息确认操作 在RabbitMQ.Client.Impl.RecoveryAwareModel.BasicAck(UInt64 deliveryTag, Boolean multiple)中进行了消息确认操作 在RabbitMQ.Client.Impl.AutorecoveringModel.BasicAck(UInt64 deliveryTag, Boolean multiple)中进行了消息确认操作 在QueueExample.Consumer.Program.<>c__DisplayClass0_0.b__0(Object model, BasicDeliverEventArgs ea)中发生异常,在D:\Projects\RabbitMQTutorial\QueueExample\QueueExample.Consumer\Program.cs的第36行 在RabbitMQ.Client.Events.EventingBasicConsumer.HandleBasicDeliver(String consumerTag, UInt64 deliveryTag, Boolean redelivered, String exchange, String routingKey, IBasicProperties properties, ReadOnlyMemory`1 body)执行消息传递操作 在RabbitMQ.Client.Impl.ConcurrentConsumerDispatcher.<>c__DisplayClass10_0.b__0()中进行了异步操作
感谢您的帮助。