我正在寻找一种从我的Django应用程序向RabbitMQ服务器发布消息的方法。这不是为了任务卸载,因此我不想使用Celery。目的是使用Django应用程序发布到交换机,并在Docker容器中拥有一个姐妹(非Django)应用程序从该队列消费。
这一切似乎非常简单,然而,即使没有明确调用,我似乎无法发布到交换机而不建立和关闭连接。
为了解决这个问题,我定义了一个类,其中包含一个嵌套的单例类,使用Pika维护与RabbitMQ服务器的连接。想法是嵌套单例将只实例化一次,在那时声明连接。每当要发布到队列时,单例处理它。
我会在django应用程序中需要时导入rmq_connection。在玩具应用程序和python repl中一切都正常,但每次在django应用程序中调用send函数时,都会建立一个新连接。然后连接会立即关闭,并显示“客户端意外关闭TCP连接”的消息。消息确实被正确发布到交换机中。
所以我确定django有些问题,可能与其如何处理进程等有关。问题仍然存在,如何在不重新建立连接的情况下向队列发布多个消息?
这一切似乎非常简单,然而,即使没有明确调用,我似乎无法发布到交换机而不建立和关闭连接。
为了解决这个问题,我定义了一个类,其中包含一个嵌套的单例类,使用Pika维护与RabbitMQ服务器的连接。想法是嵌套单例将只实例化一次,在那时声明连接。每当要发布到队列时,单例处理它。
import logging
import pika
import os
logger = logging.getLogger('django')
class PikaChannelSingleton:
class __Singleton:
channel = pika.adapters.blocking_connection.BlockingChannel
def __init__(self):
self.initialize_connection()
def initialize_connection(self):
logger.info('Attempting to establish RabbitMQ connection')
credentials = pika.PlainCredentials(rmq_username, rmq_password)
parameters = pika.ConnectionParameters(rmq_host, rmq_port, rmq_vhost, credentials, heartbeat=0)
connection = pika.BlockingConnection(parameters)
con_chan = connection.channel()
con_chan.exchange_declare(exchange='xchng', exchange_type='topic', durable=True)
self.channel = con_chan
def send(self, routing_key, message):
if self.channel.is_closed:
PikaChannelSingleton.instance.initialize_connection()
self.channel.basic_publish(exchange='xchng', routing_key=routing_key,
body=message)
instance = None
def __init__(self, *args, **kwargs):
if not PikaChannelSingleton.instance:
logger.info('Creating channel singleton')
PikaChannelSingleton.instance = PikaChannelSingleton.__Singleton()
@staticmethod
def send(routing_key, message):
PikaChannelSingleton.instance.send(routing_key, message)
rmq_connection = PikaChannelSingleton()
我会在django应用程序中需要时导入rmq_connection。在玩具应用程序和python repl中一切都正常,但每次在django应用程序中调用send函数时,都会建立一个新连接。然后连接会立即关闭,并显示“客户端意外关闭TCP连接”的消息。消息确实被正确发布到交换机中。
所以我确定django有些问题,可能与其如何处理进程等有关。问题仍然存在,如何在不重新建立连接的情况下向队列发布多个消息?