删除Celery/RabbitMQ中所有待处理的任务

244

如何在不知道每个任务的task_id的情况下删除所有待处理任务?

12个回答

397

根据文档

$ celery -A proj purge
或者
from proj.celery import app
app.control.purge()

(编辑:已更新为最新方法。)


63
或者,对于celery 3.0+,从Django中使用manage.py celery purgeceleryctl现已停用,并将在3.1中消失)。 - Henrik Heimbuerger
9
我在寻找如何将Redis作为后端来执行此操作时,遇到了这个答案。我发现最好的方法是使用命令 redis-cli KEYS "celery*" | xargs redis-cli DEL,这对我很有效。这会清除您正在使用的Redis后端中存储的所有任务。 - Melignus
1
我该如何在 Celery 3.0 中实现这个? - luistm
5
对我来说,只需要在相关的虚拟环境中运行“celery purge”命令。哎呀——下面有一个相同的答案……https://dev59.com/eGw05IYBdhLWcg3wmC0o#20404976 - Erve1879
这在计划任务上不起作用。在这样的“清除”之后,您仍然可以看到它们已经计划,它们将会在其预定时间运行(您可以使用inspect scheduled命令查看它们)。 - JasonGenX
显示剩余2条评论

151

对于 celery 3.0+ 版本:

$ celery purge

清除特定队列的内容:

$ celery -Q queue_name purge

11
如果您遇到连接错误,请确保指定应用程序,例如celery -A proj purge - Kamil Sindi
3
我相信 -Q 标志已被弃用(对我无效,“没有这个选项”),要在 Celery 5.0.5 上删除特定队列,您需要运行 celery -A appname purge --queues queuename。 - Thorvald

33

对于 Celery 2.x 和 3.x 版本:

当使用 -Q 参数来定义队列时,例如使用 worker:

celery worker -Q queue1,queue2,queue3

如果你不能传递队列参数,那么celery purge将无法工作。它只会删除默认队列。
解决方法是使用如下--purge参数启动你的worker:


解决方法是使用如下--purge参数启动你的worker:
celery worker -Q queue1,queue2,queue3 --purge

然而,这会运行worker。

另一个选项是使用Celery的amqp子命令。

celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3

是的,这是针对较旧的(2.x和可能的3.x)版本的celery。我无法编辑答案。 - smido

19

10

我发现对于我的复杂Celery配置,celery purge 不起作用。 我使用多个命名队列来实现不同的目的:

$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ...  # Output sorted, whitespaced for readability
celery                                          0   2
celery@web01.celery.pidbox                      0   1
celery@web02.celery.pidbox                      0   1
apns                                            0   1
apns@web01.celery.pidbox                        0   1
analytics                                       1   1
analytics@web01.celery.pidbox                   0   1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0      0   1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1      0   1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54   0   1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866   0   1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99   0   1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e   0   1

第一列是队列名称,第二列是等待队列中的消息数量,第三列是该队列的侦听器数量。队列如下:

  • celery-用于标准、幂等celery任务的队列
  • apns-用于Apple推送通知服务任务的队列,不太幂等
  • analytics-用于长时间运行的每晚分析的队列
  • *.pidbox-用于工作进程命令(例如关闭和重置)的队列,每个工作进程一个(2个celery工作进程,一个apns工作进程,一个analytics工作进程)
  • bcast.* - 广播队列,用于向所有监听队列的工作进程发送消息(而不仅仅是第一个抓取它的进程)
  • celeryev.* - Celery事件队列,用于报告任务分析

分析任务是一项暴力任务,对小数据集效果很好,但现在处理时间超过24小时。偶尔会出现问题,它会卡在等待数据库上。需要重新编写,但在此之前,当它被卡住时,我会终止任务,清空队列,然后重试。我通过查看分析队列的消息计数来检测“卡住”,它应该为0(已完成分析)或1(等待昨晚的分析完成)。2或更高是不好的,我会收到一封电子邮件。

celery purge 提供了从其中一个广播队列中删除任务的选项,但我没有看到选择其他命名队列的选项。

这是我的处理过程:

$ sudo /etc/init.d/celeryd stop  # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics  # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop  # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers  # Confirm messages is 0
$ sudo /etc/init.d/celeryd start

不过这不是一个答案,是吗?但是非常有信息量! - Armen Michaeli
4
celeryctl purge 无法使用命名队列。但是 python manage.py celery amqp queue.purge <queue_name> 可以。对于那些有复杂设置的人来说,我认为上下文很有用,这样他们就可以在 celeryctl purge 失败时找出需要执行的操作。 - jwhitlock
我在我的Celery 3.1.17中找不到manage.py,这个文件被删除了还是全新的?我在*/bin/amqp.py中找到了看起来对应的接口(queue.purge)。但是在尝试将文件内容与文档进行关联后,我必须遗憾地承认,Celery文档非常匮乏,而且根据其源代码判断,它也是一个非常复杂的工作。 - Armen Michaeli
manage.py 是 Django 的管理脚本,而 manage.py celery 则会在从 Django 设置中加载配置后运行 celery。我没有在 Django 之外使用过 celery,但是包含的 celery 命令可能是您正在寻找的:http://celery.readthedocs.org/en/latest/userguide/monitoring.html - jwhitlock

9
如果您想要删除所有待处理、活动和保留的任务,以彻底停止Celery,请按照以下步骤操作:
from proj.celery import app
from celery.task.control import inspect, revoke

# remove pending tasks
app.control.purge()

# remove active tasks
i = inspect()
jobs = i.active()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)

# remove reserved tasks
jobs = i.reserved()
for hostname in jobs:
    tasks = jobs[hostname]
    for task in tasks:
        revoke(task['id'], terminate=True)

如果您还想撤销已计划的任务,即那些由于“eta”或“countdown”而等待的任务,您还需要撤销“i.scheduled()”队列中的任务。对于这些任务,ID位于“request”键内(至少对于我在Redis上是这样),即您需要运行“revoke(task['request']['id'])”。此外,对于我在Django中使用的Celery 5.2.7,我需要运行“app.control.inspect”和“app.control.revoke” - 我无法独立导入它们(会得到未绑定错误)。我的最终代码在这里 - Chris

8

在Celery 3+中

http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks

命令行界面(CLI)

清除指定队列:

 celery -A proj amqp queue.purge <queue name>

清空已配置的队列
celery -A proj purge

我已经清除了消息,但是队列里仍然有消息?
答案:任务在实际执行后立即被确认(从队列中删除)。工作进程接收到任务后,如果已经有很多任务等待执行,它将需要一些时间才能实际执行。未被确认的消息将由工作进程保留,直到关闭与代理服务器(AMQP 服务器)的连接。当该连接关闭时(例如因为工作进程停止),代理将重新发送任务以供下一个可用的工作进程处理(或在重新启动时发送给同一个工作进程)。因此,要正确地清除等待任务的队列,您必须停止所有工作进程,然后使用 celery.control.purge() 清除任务。
因此,要清除整个队列,必须停止工作进程。

4

对于Celery 5.0+,要从CLI执行并针对特定队列进行清除:

celery -A APP_NAME purge --queues QUEUE_NAME

如果您尝试在一步中完成此操作,则在末尾添加-f选项以跳过确认步骤。


3

celery 4+ 使用celery purge命令可以清除所有配置的任务队列

celery -A *APPNAME* purge

以程序化方式:

from proj.celery import app
app.control.purge()

所有待处理任务将被清除。 参考:Celery文档


2

针对使用RabbitMQ作为消息代理的Celery版本5.0+,我们需要先从程序到代理建立一个新连接,并将该连接与需要清除的队列绑定。

# proj/celery.py
from celery import Celery
app = Celery('proj')

from proj.celery import app
queues = ['queue_A', 'queue_B', 'queue_C']
with app.connection_for_write() as conn:
    conn.connect()
    for queue in queues:
        count = app.amqp.queues[queue].bind(conn).purge()
        print(f'Purge {queue} with {count} message(s)')

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接