在编码和搜索论坛一周后,现在询问似乎是及时的...
我有一个C#应用程序,使用EventingBasicConsumer处理RabbitMQ发送的消息。我想并发处理多个消息,所以我在同一连接上实例化了几个频道(在这种情况下为8个),每个频道都有一个单独的消费者。然后,我为每个消费者的Received事件附加了一个事件处理程序。根据我到目前为止的所有阅读,这种设置应该允许事件处理程序被消费者并发触发,每个运行在其自己的线程中。但在我的情况下,消费者仅在先前的消费者确认其消息后依次接收消息。
有其他人经历过这种行为吗?我是否正确理解,在这种情况下处理技术上应该是并发的?
以下是一个基本代码,以更好地说明这个问题:
我希望看到的是这样的内容:// 并发处理
我有一个C#应用程序,使用EventingBasicConsumer处理RabbitMQ发送的消息。我想并发处理多个消息,所以我在同一连接上实例化了几个频道(在这种情况下为8个),每个频道都有一个单独的消费者。然后,我为每个消费者的Received事件附加了一个事件处理程序。根据我到目前为止的所有阅读,这种设置应该允许事件处理程序被消费者并发触发,每个运行在其自己的线程中。但在我的情况下,消费者仅在先前的消费者确认其消息后依次接收消息。
有其他人经历过这种行为吗?我是否正确理解,在这种情况下处理技术上应该是并发的?
以下是一个基本代码,以更好地说明这个问题:
Initialise() {
ConsumerChannels_ = new IModel[ConsumerCount_];
Consumers_ = new EventingBasicConsumer[ConsumerCount_];
for (int i = 0; i < ConsumerCount_; ++i)
{
ConsumerChannels_[i] = Connection_.CreateModel();
Consumers_[i] = new EventingBasicConsumer(ConsumerChannels_[i]);
Consumers_[i].Received += MessageReceived;
}
}
MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e)
{
int id = GetConsumerIndex(sender);
Log_.Debug("Consumer " + id + ": processing started...");
// do some time consuming processing here
sender.Model.BasicAck(e.DeliveryTag, false);
Log_.Debug("Consumer " + id + ": processing ended.");
}
我希望看到的是这样的内容:// 并发处理
但是我得到的却是:// 顺序处理消费者1:处理开始...
消费者2:处理开始...
消费者3:处理开始...
...
消费者6:处理结束。
消费者7:处理结束。
消费者8:处理结束。
有什么想法可以继续实现吗?消费者1:处理开始...
消费者1:处理结束。
消费者2:处理开始...
消费者2:处理结束。
...
消费者8:处理开始...
消费者8:处理结束。