RabbitMQ、.NET和多线程

3
我已经使用RabbitMQ和它的.NET连接器玩了一段时间。 经过测试期,我的新系统在网络应用程序和API的代理中使用RabbitMQ后开始投入生产。 所有的应用程序几分钟内都被卡住了。 我想我达到了某些依赖于操作系统的阈值或者弄乱了一些TCP堆栈(在阻塞后,甚至我的客户端的TCP/IP连接也无法再次连接到Web服务器)。
我还没有找到关于如何处理通过数十个进程和数千个线程进行的高强度流量的好素材。
我有一个系统,每秒产生10k+线程,每个线程必须通过连接丢弃一个消息,然后终止。 我只为所有这些消息设置了一对'catchers'。
已经采取了一些对策。 不为每个新线程使用一个新连接->声明连接工厂并使用静态连接(连接和通道的函数被认为是线程安全的)->已解决
以下是工厂的代码
    public static class Factory
    {
        private static IConnection sharedConnection_cl;
        public static IConnection SharedConnection_cl
        {
            get
            {
                if (sharedConnection_cl == null)
                {
                    sharedConnection_cl = GetRabbitMqConnection();
                }
               return sharedConnection_cl;
         }

    private static IConnection GetRabbitMqConnection()
            {
                ConnectionFactory connectionFactory = new             ConnectionFactory();
                connectionFactory.HostName = "x.y.z.w";
                connectionFactory.UserName = "notTheGuestUser";
                connectionFactory.Password = "abcdef";
                connectionFactory.VirtualHost = "/dedicatedHost";
                return connectionFactory.CreateConnection();
          }
不要使用所有可用的Erlang进程。在不到10分钟内,Erlang进程阈值就会达到(在同一连接上关闭通道不会触发服务器上相应Erlang进程的死亡)-> 为任何给定连接添加了最大通道数的阈值,并使用信号量保护访问。每隔一段时间,连接会被关闭并重新创建(当连接关闭时,相应的Erlang进程终止)-> 问题解决 以下是管理通道阈值的代码
public static class Factory
{
    private static IConnection sharedConnection_cl;
    private static int channelCount_int { get; set; }
    static Semaphore connectionAccessSemaphore_sem = new Semaphore(1, 1);
    public static IConnection SharedConnection_cl
    {
        get
        {
            if (sharedConnection_cl == null)
            {
                sharedConnection_cl = GetRabbitMqConnection();
                channelCount_int = 0;
            }
            connectionAccessSemaphore_sem.WaitOne();
            if (channelCount_int > 10000)
            {

                sharedConnection_cl.Close();
                sharedConnection_cl = GetRabbitMqConnection();
                channelCount_int = 0;
            }
            else
            {
                channelCount_int++;
            }
            connectionAccessSemaphore_sem.Release();
            return sharedConnection_cl;
        }
    }

现在...超越增加操作系统标准阈值(这只会将必然的阻塞时间从几分钟延长到几个小时)...

有没有好的实践方法来管理连接和通道,以避免触发操作系统阈值和饱和趋势的出现?

感谢任何支持。


我只是为所有这些消息设置了几个“接收者”。你的意思是订阅者吗? - Alex G.
这可能会有所帮助 https://www.rabbitmq.com/memory.html,https://www.rabbitmq.com/persistence-conf.html - Alex G.
看看这个微服务架构,也许能对你有所帮助。https://msdn.microsoft.com/zh-cn/magazine/mt595752.aspx - Jaider
谢谢大家,但是增加阈值并不能解决问题,只能让我的系统多活10分钟。 - Ph user
1个回答

2

好的,解决方案已经存在。只需将信号量上移即可解决问题。我没有考虑到在系统重启时,当所有应用程序池都被关闭时,我在连接实例分配上存在并发问题。->已解决

public static class Factory{
private static IConnection sharedConnection_cl;
private static int channelCount_int { get; set; }
static Semaphore connectionAccessSemaphore_sem = new Semaphore(1, 1);
public static IConnection SharedConnection_cl
{
    get
    {
        connectionAccessSemaphore_sem.WaitOne();

        if (sharedConnection_cl == null)
        {
            sharedConnection_cl = GetRabbitMqConnection();
            channelCount_int = 0;
        }

        if (channelCount_int > 10000)
        {

            sharedConnection_cl.Close();
            sharedConnection_cl = GetRabbitMqConnection();
            channelCount_int = 0;
        }
        else
        {
            channelCount_int++;
        }
        connectionAccessSemaphore_sem.Release();
        return sharedConnection_cl;
    }
}

不确定为什么应用程序池的锁定会导致服务器上所有TCP连接被锁定。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接