如何使用Celery和Django将任务路由到不同的队列

29

我正在使用以下技术栈:

  • Python 3.6
  • Celery v4.2.1(Broker:RabbitMQ v3.6.0
  • Django v2.0.4

根据Celery文档的说法,将定时任务运行在不同队列上应该很容易,只需要在CELERY_ROUTES中为任务定义相应的队列即可。然而,所有任务似乎都在Celery的默认队列上执行。

以下是my_app/settings.py上的配置:

CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_ROUTES = {
 'app1.tasks.*': {'queue': 'queue1'},
 'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
    'app1_test': {
        'task': 'app1.tasks.app1_test',
        'schedule': 15,
    },
    'app2_test': {
        'task': 'app2.tasks.app2_test',
        'schedule': 15,
    },

}

这些任务只是用于测试路由的简单脚本:

文件app1/tasks.py


文件app1/tasks.py
from my_app.celery import app
import time


@app.task()
def app1_test():
    print('I am app1_test task!')
    time.sleep(10)

文件 app2/tasks.py:

from my_app.celery import app
import time


@app.task()
def app2_test():
    print('I am app2_test task!')
    time.sleep(10)

当我使用所有必需的队列运行Celery时:

celery -A my_app worker -B -l info -Q celery,queue1,queue2

RabbitMQ会显示只有默认队列"celery"在运行任务:

sudo rabbitmqctl list_queues
# Tasks executed by each queue:
#  - celery 2
#  - queue1 0
#  - queue2 0

有人知道如何修复这种意外的行为吗?

谢谢,

3个回答

38

我已经让它工作了,这里有几个要注意的事项:

根据Celery 4.2.0文档CELERY_ROUTES应该是定义队列路由的变量,但是对我来说只有使用CELERY_TASK_ROUTES才有效。任务路由似乎与Celery Beat无关,因此这仅适用于手动安排的任务:

app1_test.delay()
app2_test.delay()
或者
app1_test.apply_async()
app2_test.apply_async()

为了使其与Celery Beat协同工作,我们只需要在CELERY_BEAT_SCHEDULE变量中明确定义队列。文件my_app/settings.py的最终设置如下:

CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_TASK_ROUTES = {
 'app1.tasks.*': {'queue': 'queue1'},
 'app2.tasks.*': {'queue': 'queue2'},
}
CELERY_BEAT_SCHEDULE = {
    'app1_test': {
        'task': 'app1.tasks.app1_test',
        'schedule': 15,
        'options': {'queue': 'queue1'}
    },
    'app2_test': {
        'task': 'app2.tasks.app2_test',
        'schedule': 15,
        'options': {'queue': 'queue2'}
    },

}

并且要运行Celery监听这两个队列:

celery -A my_app worker -B -l INFO -Q queue1,queue2

何处

  • -A: 项目或应用的名称。
  • -B: 启动任务调度程序Celery beat
  • -l: 定义日志记录级别。
  • -Q: 定义此工作程序处理的队列。

希望这能为其他开发者节省一些时间。


23
CELERY_ROUTESCELERY_TASK_ROUTES 的区别解释:CELERY_ROUTES 是旧版 Celery 设置的名称,现在已被 task_routes 取代。但是在 Django 设置文件中,Celery 设置必须使用大写字母(例如 TASK_ROUTES)。为了避免与其他 Django 设置冲突,建议使用 CELERY_ 前缀来设置 celery 配置,即使用 CELERY_TASK_ROUTES。要加载这些设置,可以使用以下方式:app.config_from_object('django.conf:settings', namespace='CELERY')。因此,CELERY_TASK_ROUTES 只是新设置名称的大写和前缀修改。 - sparrowt
对于那些想了解 Celery Beat 的人,它并不独立于任务路由,但它应该同样有效。 - Reza
不,我认为我们不需要在设置文件中定义“options”参数,如果您只是在命令中删除其他队列名称之前的celery,那么它将正常工作,因为您已经在settings.py文件中定义了CELERY_ROUTES。 - Chetan Vashisth
2
这在 celery v5.0 中也能很好地工作,谢谢。 - Ham
喜欢这个答案!对我来说在2022年1月的v4.4.0中完美运行,@sparrowt的评论和解释使用CELERY_TASK_ROUTES而不是CELERY_ROUTES简直棒极了:D - pyofey

26

queue参数添加到装饰器可能会对您有所帮助,

@app.task(<b>queue='queue1'</b>)
def app1_test():
    print('I am app1_test task!')
    time.sleep(10)

4
谢谢@JPG,这确实是个可行的替代方案,然而我更喜欢在Django设置文件中定义队列,以获得更大的灵活性。这样我可以根据环境使用不同的队列名称:测试、暂存和生产。 - Ander
5
我放弃了配置选项"celery_task_default_queue"、"task_default_queue"和"task_routes"。这些选项都没有起作用。唯一有效的是装饰器,谢谢JPG。 - Researcher

1

好的,我尝试了与你使用的运行worker相同的命令,发现你只需将"-Q"参数后面的"celery"删除就可以了。

因此,旧命令是

celery -A my_app worker -B -l info -Q celery,queue1,queue2

新命令是

celery -A my_app worker -B -l info -Q queue1,queue2

应该始终指定队列名称和工作器的名称。例如,“celery -A my_app worker -Q my_queue,my_other_queue -P threads --task-events -c 40 -l INFO -B --scheduler django_celery_beat.schedulers:DatabaseScheduler -n celery@%h” - TheZeke

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接