如何在不知道每个任务的task_id
的情况下删除所有待处理任务?
对于 celery 3.0+ 版本:
$ celery purge
清除特定队列的内容:
$ celery -Q queue_name purge
celery -A proj purge
。 - Kamil Sindi当使用 -Q 参数来定义队列时,例如使用 worker:
celery worker -Q queue1,queue2,queue3
如果你不能传递队列参数,那么celery purge
将无法工作。它只会删除默认队列。
解决方法是使用如下--purge
参数启动你的worker:
--purge
参数启动你的worker:celery worker -Q queue1,queue2,queue3 --purge
然而,这会运行worker。
另一个选项是使用Celery的amqp子命令。
celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3
Celery 3+
命令行界面:
$ celery -A proj purge
通过编程:
>>> from proj.celery import app
>>> app.control.purge()
我发现对于我的复杂Celery配置,celery purge
不起作用。 我使用多个命名队列来实现不同的目的:
$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ... # Output sorted, whitespaced for readability
celery 0 2
celery@web01.celery.pidbox 0 1
celery@web02.celery.pidbox 0 1
apns 0 1
apns@web01.celery.pidbox 0 1
analytics 1 1
analytics@web01.celery.pidbox 0 1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0 0 1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1 0 1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54 0 1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866 0 1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99 0 1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e 0 1
第一列是队列名称,第二列是等待队列中的消息数量,第三列是该队列的侦听器数量。队列如下:
分析任务是一项暴力任务,对小数据集效果很好,但现在处理时间超过24小时。偶尔会出现问题,它会卡在等待数据库上。需要重新编写,但在此之前,当它被卡住时,我会终止任务,清空队列,然后重试。我通过查看分析队列的消息计数来检测“卡住”,它应该为0(已完成分析)或1(等待昨晚的分析完成)。2或更高是不好的,我会收到一封电子邮件。
celery purge
提供了从其中一个广播队列中删除任务的选项,但我没有看到选择其他命名队列的选项。
这是我的处理过程:
$ sudo /etc/init.d/celeryd stop # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers # Confirm messages is 0
$ sudo /etc/init.d/celeryd start
celeryctl purge
无法使用命名队列。但是 python manage.py celery amqp queue.purge <queue_name>
可以。对于那些有复杂设置的人来说,我认为上下文很有用,这样他们就可以在 celeryctl purge
失败时找出需要执行的操作。 - jwhitlockmanage.py
,这个文件被删除了还是全新的?我在*/bin/amqp.py
中找到了看起来对应的接口(queue.purge
)。但是在尝试将文件内容与文档进行关联后,我必须遗憾地承认,Celery文档非常匮乏,而且根据其源代码判断,它也是一个非常复杂的工作。 - Armen Michaelimanage.py
是 Django 的管理脚本,而 manage.py celery
则会在从 Django 设置中加载配置后运行 celery。我没有在 Django 之外使用过 celery,但是包含的 celery
命令可能是您正在寻找的:http://celery.readthedocs.org/en/latest/userguide/monitoring.html - jwhitlockfrom proj.celery import app
from celery.task.control import inspect, revoke
# remove pending tasks
app.control.purge()
# remove active tasks
i = inspect()
jobs = i.active()
for hostname in jobs:
tasks = jobs[hostname]
for task in tasks:
revoke(task['id'], terminate=True)
# remove reserved tasks
jobs = i.reserved()
for hostname in jobs:
tasks = jobs[hostname]
for task in tasks:
revoke(task['id'], terminate=True)
在Celery 3+中
http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks
命令行界面(CLI)
清除指定队列:
celery -A proj amqp queue.purge <queue name>
celery -A proj purge
对于Celery 5.0+,要从CLI执行并针对特定队列进行清除:
celery -A APP_NAME purge --queues QUEUE_NAME
如果您尝试在一步中完成此操作,则在末尾添加-f
选项以跳过确认步骤。
celery 4+ 使用celery purge命令可以清除所有配置的任务队列
celery -A *APPNAME* purge
以程序化方式:
from proj.celery import app
app.control.purge()
所有待处理任务将被清除。 参考:Celery文档
针对使用RabbitMQ作为消息代理的Celery版本5.0+,我们需要先从程序到代理建立一个新连接,并将该连接与需要清除的队列绑定。
# proj/celery.py
from celery import Celery
app = Celery('proj')
from proj.celery import app
queues = ['queue_A', 'queue_B', 'queue_C']
with app.connection_for_write() as conn:
conn.connect()
for queue in queues:
count = app.amqp.queues[queue].bind(conn).purge()
print(f'Purge {queue} with {count} message(s)')
manage.py celery purge
(celeryctl
现已停用,并将在3.1中消失)。 - Henrik Heimbuergerredis-cli KEYS "celery*" | xargs redis-cli DEL
,这对我很有效。这会清除您正在使用的Redis后端中存储的所有任务。 - Melignusinspect scheduled
命令查看它们)。 - JasonGenX