RabbitMQ中的并发性

7
在编码和搜索论坛一周后,现在询问似乎是及时的...
我有一个C#应用程序,使用EventingBasicConsumer处理RabbitMQ发送的消息。我想并发处理多个消息,所以我在同一连接上实例化了几个频道(在这种情况下为8个),每个频道都有一个单独的消费者。然后,我为每个消费者的Received事件附加了一个事件处理程序。根据我到目前为止的所有阅读,这种设置应该允许事件处理程序被消费者并发触发,每个运行在其自己的线程中。但在我的情况下,消费者仅在先前的消费者确认其消息后依次接收消息。
有其他人经历过这种行为吗?我是否正确理解,在这种情况下处理技术上应该是并发的?
以下是一个基本代码,以更好地说明这个问题:
Initialise() {
    ConsumerChannels_ = new IModel[ConsumerCount_];
    Consumers_ = new EventingBasicConsumer[ConsumerCount_];
    for (int i = 0; i < ConsumerCount_; ++i)
    {
         ConsumerChannels_[i] = Connection_.CreateModel();
         Consumers_[i] = new EventingBasicConsumer(ConsumerChannels_[i]);
         Consumers_[i].Received += MessageReceived;
    }
}

MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e)
{
    int id = GetConsumerIndex(sender);
    Log_.Debug("Consumer " + id + ": processing started...");         
    // do some time consuming processing here
    sender.Model.BasicAck(e.DeliveryTag, false);
    Log_.Debug("Consumer " + id + ": processing ended.");
}

我希望看到的是这样的内容:// 并发处理

消费者1:处理开始...

消费者2:处理开始...

消费者3:处理开始...

...

消费者6:处理结束。

消费者7:处理结束。

消费者8:处理结束。

但是我得到的却是:// 顺序处理

消费者1:处理开始...

消费者1:处理结束。

消费者2:处理开始...

消费者2:处理结束。

...

消费者8:处理开始...

消费者8:处理结束。

有什么想法可以继续实现吗?

虽然您有多个消费者,但它们都在同一个线程上运行。您需要开启线程并为每个创建一个消费者。您也可以始终只有一个消费者,并多次运行处理应用程序。或者,这里有个不要脸的自荐,使用类似 Shuttle.Esb 这样的工具来完成繁重的工作 :) - Eben Roux
2个回答

3

创建 ConnectionFactory 时,您实际上可以设置并行处理任务的数量!

ConnectionFactory factory = new ConnectionFactory
{
    ConsumerDispatchConcurrency = 2,
};

默认值为1,即串行/顺序处理。

我通过解析.NET客户端源代码发现了这一点。以下是有趣的部分(concurrencyConsumerDispatchConcurrency设置):

Func<Task> loopStart = ProcessChannelAsync;
if (concurrency == 1)
{
    _worker = Task.Run(loopStart);
}
else
{
    var tasks = new Task[concurrency];
    for (int i = 0; i < concurrency; i++)
    {
        tasks[i] = Task.Run(loopStart);
    }
    _worker = Task.WhenAll(tasks);
}

但是要注意,这可能会导致竞争条件!该属性有以下备注:

对于并发大于一的情况,这会移除消费者按照它们接收到的顺序处理消息的保证。此外,消费者需要线程/并发安全。


此外,我发现如果 QoS 预取大小 不是 >> 1,.Net Rabbit 客户端会忽略 / 限制 ConsumerDispatchConcurrency - StuartLC

1

你有两种方法来实现这个:

通过添加自己的线程池来增加并发性:

MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e) {
    int id = GetConsumerIndex(sender);
    Log_.Debug("Consumer " + id + ": processing started...");         
    // do some time consuming processing here
    // PUT your thread-pool here and process the messages inside the thread

    sender.Model.BasicAck(e.DeliveryTag, false);
    Log_.Debug("Consumer " + id + ": processing ended."); }

}

注意:可以在不同的线程中调用BasicAck
或者
您可以通过使用QoS=1将更多的消费者添加到队列中,这样可以循环消耗消息。

+1 @Gabriele. @Kia 我们基本上就是这样做的。这样做简化了操作:从RabbitMQ的角度来看,只有一个消费者。而且,不需要手动启动线程/管理ThreadPool。将消息处理作为任务放到TaskScheduler上可以处理所有这些问题。通过变化TaskScheduler的最大并行性,可以轻松地在同一消费者内扩展并行性(多种调度程序提供此功能)。专用线程池与共享默认ThreadPool之间的区别是一个更广泛的讨论,这取决于您拥有多少此类消费者以及所需的隔离程度。 - mountain traveller
添加自己的线程池解决了问题。知道我可以从自己的多个线程中调用BasicAck非常有帮助。对于任何感兴趣的人,我还引入了一个简单的阻塞节流机制(使用Monitor.Wait和Monitor.Pulse)来自定义控制并发级别,使用channel.BasicQos(0、10000、false)。 - Kia

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接