为什么CELERY_ROUTES同时有“queue”和“routing_key”?

46
我理解AMQP的消息只有以下几个组成部分:
  1. 消息正文
  2. 路由键
  3. 交换机
队列附加在交换机上,消息不可能知道队列的存在。它们只是发布到一个交换机,然后根据交换机类型和路由键将消息路由到一个或多个队列。
在Celery中,推荐路由任务的方法是通过 CELERY_ROUTES 设置。根据文档,CELERY_ROUTES 是...

用于将任务路由到队列的路由器列表或单个路由器。 http://celery.readthedocs.org/en/latest/configuration.html#message-routing

它包括一个示例...

要将任务路由到 feed_tasks 队列,可以在 CELERY_ROUTES 设置中添加一个条目:

CELERY_ROUTES = {
    'feeds.tasks.import_feed': {
        'queue': 'feed_tasks',
        'routing_key': 'feed.import',
    },
}
但是等一下——根据AMQP,消息仅带有路由键!那么“队列”到底是做什么用的呢?
此外,还有一个默认队列的概念。如果你调用了一个没有被CELERY_ROUTES捕捉到的任务,它会回退到CELERY_DEFAULT_QUEUE。但同样地,按照AMQP的规定,消息并不知道队列的存在。那这不应该是默认的路由键吗?
2个回答

19

确实,在Celery中当你转到队列时会有些混淆,需要记住的一件事是queue参数是指一个Celery Kombu Queue对象,而不是直接指向AMQP队列,可以通过阅读文档中的这篇文章来理解这一点。 当然,Celery创建具有相同名称的队列和交换机是造成queue参数使用混乱的根源。 在文档中你还可以阅读以下段落:

  

如果您有另一个队列,但在另一个交换机上,您想要添加该队列,只需指定自定义交换机和交换机类型:

CELERY_QUEUES = (
    Queue('feed_tasks',    routing_key='feed.#'),
    Queue('regular_tasks', routing_key='task.#'),
    Queue('image_tasks',   exchange=Exchange('mediatasks', type='direct'),
                       routing_key='image.compress'),
)
因此,您可以在同一交换机上绑定2个不同的队列。 在使用交换机和键路由任务后,您可以使用Routers类。
class MyRouter(object):

    def route_for_task(self, task, args=None, kwargs=None):
        if task == 'myapp.tasks.compress_video':
            return {'exchange': 'video',
                    'exchange_type': 'topic',
                    'routing_key': 'video.compress'}
        return None

更多内容请参见http://celery.readthedocs.org/en/latest/userguide/routing.html#routers


不确定理解:调用Queue()实例化...是绑定而不是队列吗? - Jocelyn delalande
6
可以这样说,kombu中的Queue对象对应于使用特定路由键绑定到某个交换机的AMQP队列。 - Mauro Rocco

5

在这里声明队列的目的是让 celery 创建这些队列并使用 RabbitMQ 进行配置。

对于低级别的 AMQP 客户端,您需要首先声明队列,然后声明交换机,最后将交换机绑定到队列。稍后在发布消息时,只需发布到交换机即可。

看起来 celery 使用这个结构自动完成这个过程。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接