Celery任务优先级

17
我想使用 Celery 管理任务。我希望拥有一个单一的任务队列(并发数为 1),并能够按不同的优先级将任务推送到队列中,以使更高优先级的任务能够抢占其他任务。
我要像下面这样向队列添加三个任务:
add_tasks.py
from tasks import example_task

example_task.apply_async((1), priority=1)
example_task.apply_async((2), priority=3)
example_task.apply_async((3), priority=2)

我有以下配置:

tasks.py

from __future__ import absolute_import, unicode_literals
from celery import Celery
from kombu import Queue, Exchange
import time

app = Celery('tasks', backend='rpc://', broker='pyamqp://')

app.conf.task_queues = [Queue('celery', Exchange('celery'), routing_key='celery', queue_arguments={'x-max-priority': 10})]


@app.task
def example_task(task_num):
    time.sleep(3)
    print('Started {}'.format(task_num)
    return True

我希望我添加的第二个任务比第三个任务先运行,因为它具有更高的优先级,但实际上并没有。它们按照添加的顺序运行。

我正在遵循文档并认为我已正确配置应用程序。

我是做错了什么还是误解了优先级特性?

2个回答

11

有可能队列没有机会对消息进行优先排序(因为它们在排序之前被下载)。尝试使用这两个设置(根据您的项目进行适应):

CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1

默认情况下,预取乘数为4。

我开发了一个示例应用程序来实现Celery的优先任务处理(规模非常小),请在此处查看。在开发过程中,我遇到了一个非常类似的问题,而这个设置更改实际上解决了它。

请注意,您还需要RabbitMQ版本3.5.0或更高版本。


感谢您的回答。我按照您描述的设置,完全按照文档下载并运行了您的项目,但仍然没有看到优先级行为起作用。 - EngineerCamp
@EngineerCamp 我再次克隆后进行了重新检查,似乎一切正常。这里有一个小的预览视频链接,只有1个Celery工作进程,以便轻松观察任务的排序。在视频开头(_当视频被转码为360p时_),请注意当单击480p和720p时,720p获得更高的优先级,即使先单击480p也是如此。 - Vijeth Aradhya
我又试了一次,但还是没有成功。我正在使用Python 3,并且在Ubuntu 14.04上,所以可能存在一些差异。 - EngineerCamp
2
@EngineerCamp 我的RabbitMQ版本是"3.5.7"。请在RabbitMQ网站Celery FAQ页面上检查,您会注意到您需要3.5.0或更高版本才能实现优先级队列。 如果不是这种情况,请告诉我 :) - Vijeth Aradhya
1
就是这样,原来我使用的是非常老的版本。我已经升级了,现在它完美地运行了。感谢您的帮助 :) - EngineerCamp
Celery的版本怎么样?如果我有新的RabbitMQ版本,我能在Celery 3.1中使用优先级吗? - ItayB

0
如果您使用的是6.0以上版本的celery,请使用这些
        celery_instance.conf.task_acks_late = True
        celery_instance.conf.worker_prefetch_multiplier = 1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接