RabbitMQ频道创建指南

17
我正在编写一个简单的类,用于通过RabbitMQ发送和接收消息。我已经阅读了许多关于RabbitMQ的指南、博客文章、白皮书等。大部分示例都将连接和通道包装在using块中,并通过实现它们为单例来加以矛盾地说明。特别是,关于通道,我看到有评论说,您不应该同时使用多个线程来使用单个通道。
我的库是用C#编写的单例模式,在第一次实例化时连接静态连接。
我想过为通道做同样的事情,但我打算使用同一库来允许发布/订阅多个交换机/队列。发布和订阅都可能来自多个线程。
最后我的问题是: 我该如何实现通道的创建? 按消息? 每个消费者有一个唯一的私有通道,发布者同步访问一个唯一的通道? 你懂我的意思。 请记住,我打算使用单个服务器,带有几十个消费者/发布者,不会更多。
6个回答

9

编辑(2016-1-26):通道不是线程安全的。关于这一点的文档已经在2015年4月2015年5月之间发生了变化。新的文本:

通道实例不能在线程之间共享。应用程序应该优先考虑为每个线程使用一个通道,而不是在多个线程之间共享同一个通道。虽然某些通道操作可以安全地并发调用,但某些操作则不行,并且会导致线路上的错误帧交错。在线程之间共享通道也会干扰 * 发布者确认。

从您的问题中可以看出,您没有预定义的、固定数量的线程,大部分都是发布/订阅RabbitMQ(在这种情况下,您可以考虑在线程初始化时创建通道,或使用 ThreadLocal<IModel>)。

如果同时进行的RabbitMQ操作很少或消息大小始终很小,您可以简单地在所有RabbitMQ发布/订阅操作周围放置lock(channel)。 如果您需要以交错方式传输多个请求-这就是通道的首要用途-使用任意线程,您可能需要创建一个通道池,例如ConcurrentQueue<IModel>,其中您可以将未使用的通道入队并在需要时出队。 通道创建开销非常低,从性能测试中我有一种感觉,即通道创建过程不涉及任何网络io,即似乎通道会在客户端第一次使用时自动在RabbitMQ服务器上创建。 编辑:感谢Pang,没有必要为每个操作打开一个通道,这样做非常低效,因为打开通道是网络往返。

旧版本 (2016-1-26之前):Java和.net实现的大部分已过时的细节:

关于通道和多个线程,这有点令人困惑,因为它取决于实现。

Java实现通道是线程安全的

通道实例可供多个线程使用。

但是:

当通道在多个线程之间共享时,确认未被正确处理。

.net实现通道不是线程安全的

如果多个线程需要访问特定的IModel实例,则应用程序应自行执行互斥操作。 IModel操作序列化不正确的症状包括但不限于:在线路上发送无效的帧序列,抛出NotSupportedExceptions等。除了Robin提供的有用答案外,在.NET实现中,您不能仅共享通道。

2
您可以共享一个Connection。但是在Connection之上,您“不能”共享IModel(Channel)。 - user2864740
@user2864740 谢谢。我修正了措辞。 - Evgeniy Berezovsky
1
有人能告诉我这是否仍然适用吗? - kuskmen

8

使用ASP.NET Core,您可以利用ObjectPool。创建一个IPooledObjectPolicy。

    using Microsoft.Extensions.ObjectPool;  
    using Microsoft.Extensions.Options;  
    using RabbitMQ.Client;  

    public class RabbitModelPooledObjectPolicy : IPooledObjectPolicy<IModel>  
    {  
        private readonly RabbitOptions _options;  

        private readonly IConnection _connection;  

        public RabbitModelPooledObjectPolicy(IOptions<RabbitOptions> optionsAccs)  
        {  
            _options = optionsAccs.Value;  
            _connection = GetConnection();  
        }  

        private IConnection GetConnection()  
        {  
            var factory = new ConnectionFactory()  
            {  
                HostName = _options.HostName,  
                UserName = _options.UserName,  
                Password = _options.Password,  
                Port = _options.Port,  
                VirtualHost = _options.VHost,  
            };  

            return factory.CreateConnection();  
        }  

        public IModel Create()  
        {  
            return _connection.CreateModel();  
        }  

        public bool Return(IModel obj)  
        {  
            if (obj.IsOpen)  
            {  
                return true;  
            }  
            else  
            {  
                obj?.Dispose();  
                return false;  
            }  
        }  
    }  

然后为ObjectPool配置依赖注入

services.AddSingleton<ObjectPoolProvider, DefaultObjectPoolProvider>();
services.AddSingleton(s =>
{
   var provider = s.GetRequiredService<ObjectPoolProvider>();
   return provider.Create(new RabbitModelPooledObjectPolicy());
});

您可以注入 ObjectPool<IModel>,并使用它。

var channel = pool.Get();
try
{
    channel.BasicPublish(...);
}
finally
{
    pool.Return(channel);
}

Sources:

https://www.c-sharpcorner.com/article/publishing-rabbitmq-message-in-asp-net-core/

https://developpaper.com/detailed-explanation-of-object-pools-various-usages-in-net-core/


4

它阐明了aqmp的内部机制。 目前,我的理解是:

A. 我可以从每个应用程序中使用单个共享的TCP连接来连接服务器(作为静态共享资源)。

B. 对于每个“任务”,我应该创建一个通道(例如,一个用于监听队列X,一个用于发布到交换机Y等),假设这些“任务”可以并行执行。

C. 或者,在单个应用程序中,我可以使用一个通道来处理所有内容,同时确保对它的访问被同步化 - 使用某种锁定机制,假设通道实际使用(锁定)的时间跨度相对非常短。


2

您是正确的,Channel不是线程安全的,不应该被多个线程访问。

如果您计划使用不同的队列,可以使用单个通道拥有多个队列。但是,如果您计划使用多个交换机(不确定为什么会需要多个),则必须在您的单例中跟踪多个交换机和通道,或将该责任转移给调用者。

我建议构建一个具有观察者模式的单例,并将所有RabbitMQ内容保存在其中,并引用您的调用者。然后,您还需要将调用封送到处理此事务的单个线程中,否则可能会出现通道对象问题。


2
我不能评论 C# 实现的细节,但了解 Amqp 通道旨在共享单个 TCP 连接,即实现多路复用。一个通道只能同时发送或接收一条消息,但一个连接可以同时在不同的通道上接收消息。假设你有两个大型的1GB文件要通过Amqp发送给单个消费者,这些消息可能会被拆分成10K块并以交错的方式发送。当你设置连接时,可以操纵默认的Amqp消息大小,这会影响你何时可能遇到交织;据我所知,此功能旨在帮助防止多个消费者共享连接时出现饥饿情况,其中一个消费者接收大型消息。

希望对你有所帮助。


0
消费者的并发考虑事项 对于库用户来说,有许多与并发相关的主题需要考虑。 在线程之间共享通道 应避免同时由多个线程使用IModel实例。应用程序代码应维护IModel实例的线程所有权的明确概念。 对于发布者来说,这是一个硬性要求:共享通道(即IModel实例)进行并发发布将导致协议级别的不正确帧交错。线程在发布时不应该共享通道实例

RabbitMQ 消费者的并发考虑事项


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接