Django:如何建立与RabbitMQ的持久连接?

6
我正在寻找一种从我的Django应用程序向RabbitMQ服务器发布消息的方法。这不是为了任务卸载,因此我不想使用Celery。目的是使用Django应用程序发布到交换机,并在Docker容器中拥有一个姐妹(非Django)应用程序从该队列消费。
这一切似乎非常简单,然而,即使没有明确调用,我似乎无法发布到交换机而不建立和关闭连接。
为了解决这个问题,我定义了一个类,其中包含一个嵌套的单例类,使用Pika维护与RabbitMQ服务器的连接。想法是嵌套单例将只实例化一次,在那时声明连接。每当要发布到队列时,单例处理它。
import logging
import pika
import os

logger = logging.getLogger('django')

class PikaChannelSingleton:
    class __Singleton:
        channel = pika.adapters.blocking_connection.BlockingChannel

        def __init__(self):
            self.initialize_connection()

        def initialize_connection(self):
            logger.info('Attempting to establish RabbitMQ connection')

            credentials = pika.PlainCredentials(rmq_username, rmq_password)
            parameters = pika.ConnectionParameters(rmq_host, rmq_port, rmq_vhost, credentials, heartbeat=0)
            connection = pika.BlockingConnection(parameters)

            con_chan = connection.channel()
            con_chan.exchange_declare(exchange='xchng', exchange_type='topic', durable=True)
            self.channel = con_chan

        def send(self, routing_key, message):
            if self.channel.is_closed:
                PikaChannelSingleton.instance.initialize_connection()

            self.channel.basic_publish(exchange='xchng', routing_key=routing_key,
                                                                body=message)
    instance = None

    def __init__(self, *args, **kwargs):
        if not PikaChannelSingleton.instance:
            logger.info('Creating channel singleton')
            PikaChannelSingleton.instance = PikaChannelSingleton.__Singleton()

    @staticmethod
    def send(routing_key, message):
        PikaChannelSingleton.instance.send(routing_key, message)

rmq_connection = PikaChannelSingleton()

我会在django应用程序中需要时导入rmq_connection。在玩具应用程序和python repl中一切都正常,但每次在django应用程序中调用send函数时,都会建立一个新连接。然后连接会立即关闭,并显示“客户端意外关闭TCP连接”的消息。消息确实被正确发布到交换机中。
所以我确定django有些问题,可能与其如何处理进程等有关。问题仍然存在,如何在不重新建立连接的情况下向队列发布多个消息?
1个回答

3
如果我理解正确,在单线程的环境下不能保持连接活跃。当您的Django应用程序继续执行时,amqp客户端不会在通道上发送心跳,连接将会断开。heartbeats
您可以使用SelectConnection而不是BlockingConnection,但这可能在Django的上下文中不容易实现。
一个很好的折衷方案可以简单地在单例中收集消息,但只在Django请求结束时使用BlockingConnection一次性发送所有消息。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接