如何正确结束RabbitMq消费者并等待RabbitMq消费者线程(库消费者回调)在关闭连接和模型之后?

3

我有一个RabbitMq消费者(RabbitMQ.Client.Events.EventingBasicConsumer),它处理传入的消息。

但我注意到,如果关闭连接和模型,则它们不会等待完成库处理线程。例如:

  1. 如果我将线程睡眠几秒钟添加到EventingBasicConsumer :: Received 回调中,那么我注意到 Close 函数( IModel IConnection Close() )在退出此消费者回调之前完成
  2. 完成Close函数后,我继续接收一些消息到EventingBasicConsumer :: Received回调中。

那么如何正确关闭消费者并等待消费者线程中的所有处理完成呢? 我想确保在关闭所有连接/消费者后,我不会从库中收到任何传入的消息。

简化代码:

RunTest()
{
    MyConsumer consumer = new MyConsumer();
    consumer.Connect();
    
    // Wait before close for process some count of incoming messages
    Thread.Sleep(10 * 1000);
    
    consumer.Disconnect();
}

class MyConsumer
{
    private RabbitMQ.Client.IConnection     m_Connection = null;
    private RabbitMQ.Client.IModel          m_Channel = null;
        
    public void Connect()
    {
        //
        // ...
        //

        m_Channel = m_Connection.CreateModel();
        m_Consumer = new RabbitMQ.Client.Events.EventingBasicConsumer(m_Channel);
        m_Consumer.Received += OnRequestReceived;
        m_ConsumerTag = m_Channel.BasicConsume(m_Config.RequestQueue, false, m_Consumer);
    }
        
    public void Disconnect()
    {
        Console.WriteLine("---> IModel::Close()");
        m_Channel.Close();
        Console.WriteLine("<--- IModel::Close()");
        
        Console.WriteLine("---> RabbitMQ.Client.IConnection::Close()");
        m_Connection.Close();
        Console.WriteLine("<--- RabbitMQ.Client.IConnection::Close()");
        
        //
        // Maybe there is need to do some RabbitMQ API call of channel/model
        // for wait to finish of all consumer callbacks?
        //
        
        m_Channel = null;
        m_Connection = null;
    }

    private void OnRequestReceived(object sender, RabbitMQ.Client.Events.BasicDeliverEventArgs mqMessage)
    {
        Console.WriteLine("---> MyConsumer::OnReceived");
        
        Console.WriteLine("MyConsumer: ThreadSleep started");
        Thread.Sleep(10000);
        Console.WriteLine("MyConsumer: ThreadSleep finished");
        
        if (m_Channel != null)
        {
            m_Channel.BasicAck(mqMessage.DeliveryTag, false);
        }
        else
        {
            Console.WriteLine("MyConsumer: already closed");
        }
        
        Console.WriteLine("<--- MyConsumer::OnReceived");
    }
}

结果:

---> MyConsumer::OnReceived
MyConsumer: ThreadSleep started

---> IModel::Close() 
<--- IModel::Close() 
---> RabbitMQ.Client.IConnection::Close() 
<--- RabbitMQ.Client.IConnection::Close() 

MyConsumer: ThreadSleep finished
MyConsumer: already closed
<--- MyConsumer::OnReceived

---> MyConsumer::OnReceived
MyConsumer: ThreadSleep started
MyConsumer: ThreadSleep finished
MyConsumer: already closed
<--- MyConsumer::OnReceived

我们看到,在消费者和连接的Close()函数退出后,MyConsumer::OnReceived已经完成。此外,我们还看到在上一个OnReceived调用完成并关闭连接之后,还有一条消息进来了(这意味着RqbbitMq会继续处理消费者消息,直到内部库队列为空,而忽略消费者和连接已经关闭的事实)。

2个回答

1

这是RabbitMQ.Client(v5.1.2)中的一个真正的bug。ConsumerWorkService.cs的源代码如下:

namespace RabbitMQ.Client
{
    public class ConsumerWorkService
    {
        ...

        class WorkPool
        {
            readonly ConcurrentQueue<Action> actions;
            readonly AutoResetEvent messageArrived;
            readonly TimeSpan waitTime;
            readonly CancellationTokenSource tokenSource;
            readonly string name;

            public WorkPool(IModel model)
            {
                name = model.ToString();
                actions = new ConcurrentQueue<Action>();
                messageArrived = new AutoResetEvent(false);
                waitTime = TimeSpan.FromMilliseconds(100);
                tokenSource = new CancellationTokenSource();
            }

            public void Start()
            {
#if NETFX_CORE
                System.Threading.Tasks.Task.Factory.StartNew(Loop, System.Threading.Tasks.TaskCreationOptions.LongRunning);
#else
                var thread = new Thread(Loop)
                {
                    Name = "WorkPool-" + name,
                    IsBackground = true
                };
                thread.Start();
#endif
            }

            public void Enqueue(Action action)
            {
                actions.Enqueue(action);
                messageArrived.Set();
            }

            void Loop()
            {
                while (tokenSource.IsCancellationRequested == false)
                {
                    Action action;
                    while (actions.TryDequeue(out action))
                    {
                        try
                        {
                            action();
                        }
                        catch (Exception)
                        {
                        }
                    }

                    messageArrived.WaitOne(waitTime);
                }
            }

            public void Stop()
            {
                tokenSource.Cancel();
            }
        }
    }
}
        

正如我们所看到的,线程 var thread = new Thread(Loop) 没有任何等待。因此,实际上,RabbitMQ.Client.Events.EventingBasicConsumer::Received 事件可以在任何时间被触发,即使没有消费者或连接已经关闭了很长时间,直到内部库队列为空为止。就像我所猜测的(:

Action action;
while (actions.TryDequeue(out action))
{
    try
    {
        action();
    }
    catch (Exception)
    {
    }
}

所以IModel::Close()只会设置CancelationToken而不会加入线程,因此需要一些解决方法来解决此问题。

0

仅为确认@Alexander的发现,该问题在.Net Client v6.2.2中仍然存在,并且事件驱动回调消费者也存在该问题。

我发现:

  • 使用EventingBasicConsumerAsyncEventingBasicConsumer注册的“Received”回调会在连接关闭后对通道上设置的prefetchCount设置的次数内被调用BasicQos
  • 在手动Ack模式下,如果连接关闭,则调用BasicAck将抛出RabbitMQ.Client.Exceptions.AlreadyClosedException,并且消息不会从队列中确认,但回调将继续处理后续消息。这可能导致在断开连接期间接收到相同消息,从而产生幂等性问题。

这可能是确保prefetchCount设置为有限且合理值(除了无界预取计数的内存考虑因素外)的另一个好原因。

最后,如果我故意意外关闭连接(例如在测试期间),我发现我需要显式分离我的消费者Received处理程序并显式调用consumerChannel.Close();(即IModel.Close)才能创建新的连接(Rabbit客户端往往会“挂起”)。

  • 如果我使用同步EventingBasicConsumer并设置了ConsumerDispatchConcurrency>1,则connection.ConnectionShutdown事件似乎不可靠地触发。
  • 当使用异步消费者时,如果我设置了ConsumerDispatchConcurrency>1,则AsyncEventingBasicConsumer上的Shutdown事件也不会触发。
  • 但是,我发现IModel /通道上的ModelShutdown事件对于同步/异步和并发性>1始终可靠触发。
consumerChannel.ModelShutdown += (sender, args) =>
{
    consumer.Received -= handler; // e.g. AsyncEventingBasicConsumer
    consumerChannel.Close(); // IModel
};

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接