使用哪种连接方式与Pika通信?

39

我一直在尝试弄清楚在使用pika时应该使用哪种连接方式,据我了解,有两个选择。

要么使用BlockingConnection,要么使用SelectConnection,但我不太确定这两者之间的区别(例如,BlockingConnection是什么阻塞?等等)

pika的文档称SelectConnection是连接到Rabbit的首选方法,因为它提供“多种事件通知方法,包括select、epoll、kqueue和poll”。

所以,我想知道这两种不同连接方式的影响是什么?

PS:我知道标题中不应该放置标签,但在这种情况下,我认为它确实有助于澄清问题。

2个回答

30

SelectConnection对于那些可以从异步设计中受益的应用架构非常有用,例如在RabbitMQ IO完成时做其他事情(例如切换到其他IO等)。这种类型的连接使用回调来指示函数返回的时间。例如,你可以声明on_connected、on_channel_open、on_exchange_declared、on_queue_declared等回调以在触发这些事件时执行操作。

如果你的RabbitMQ服务器(或连接到该服务器的连接)很慢或负载过高,则此功能尤其有用。

BlockingConnection只是阻塞直到调用的函数返回。因此,它将阻止执行线程,直到例如连接或channel_open或exchange_declared或queue_declared返回。也就是说,编写这种串行逻辑通常比异步SelectConnection逻辑更简单。对于响应灵敏的RabbitMQ服务器的简单应用程序,这些方法也可以正常工作。

我想你已经阅读了Pika文档 http://pika.readthedocs.io/en/stable/intro.html,如果没有,请在使用Pika之前务必阅读这些重要信息!

干杯!


6
Pika文档非常清楚地介绍了连接类型之间的区别。 主要区别是pika.adapters.blocking_connection.BlockingConnection()适配器用于非异步编程,而pika.adapters.select_connection.SelectConnection()适配器用于异步编程。

如果您不知道非异步/同步和异步编程之间的区别,我建议您阅读这个问题或者更深入的技术解释可以参考这篇文章

现在让我们深入了解不同的Pika适配器以及它们的作用,为了举例说明,我假设我们使用Pika来设置与RabbitMQ作为AMQP消息代理的客户端连接。

BlockingConnection()

在下面的示例中,使用用户名guest和密码guest以及虚拟主机'/'连接到监听本地主机上端口5672的RabbitMQ。 连接成功后,打开一个通道并使用test_routing_key路由键将消息发布到test_exchange交换器。 传递的BasicProperties值将消息设置为传送模式1(非持久化)并具有内容类型text/plain。 消息发布后,关闭连接:

import pika

parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')

connection = pika.BlockingConnection(parameters)

channel = connection.channel()

channel.basic_publish('test_exchange',
                      'test_routing_key',
                      'message body value',
                      pika.BasicProperties(content_type='text/plain',
                                           delivery_mode=1))

connection.close()

SelectConnection()

相比之下,使用这个连接适配器需要更多的步骤,且不够Pythonic。但是,当与其他异步服务一起使用时,它可以有巨大的性能提升。在以下代码示例中,使用了与先前示例中相同的参数和值:

import pika

# Step #3
def on_open(connection):

    connection.channel(on_open_callback=on_channel_open)

# Step #4
def on_channel_open(channel):

    channel.basic_publish('test_exchange',
                            'test_routing_key',
                            'message body value',
                            pika.BasicProperties(content_type='text/plain',
                                                 delivery_mode=1))

    connection.close()

# Step #1: Connect to RabbitMQ
parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')

connection = pika.SelectConnection(parameters=parameters,
                                   on_open_callback=on_open)

try:

    # Step #2 - Block on the IOLoop
    connection.ioloop.start()

# Catch a Keyboard Interrupt to make sure that the connection is closed cleanly
except KeyboardInterrupt:

    # Gracefully close the connection
    connection.close()

    # Start the IOLoop again so Pika can communicate, it will stop on its own when the connection is closed
    connection.ioloop.start()

结论

对于那些进行简单、非异步/同步编程的人来说,BlockingConnection()适配器是使用Pika发布消息最简单的方法。但是,如果你正在寻找一种实现异步消息处理的方法,那么SelectConnection()处理程序是更好的选择。

编码愉快!


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接