如何使用Redis作为代理来清除Celery队列中的任务

10

部分1

我已经阅读并尝试了多个SO线程来使用Redis清除celery任务,但它们都没有起作用。请告诉我如何使用Redis作为代理程序在Celery中清除任务。

部分2

此外,我有多个队列。我可以在项目目录中运行它,但是在守护进程化时,工作人员不会接受任务。我仍然需要手动启动celery工作人员。如何将其守护化?

这是我的celerd配置。

# Name of nodes to start, here we have a single node
CELERYD_NODES="w1 w2 w3 w4"


CELERY_BIN="/usr/local/bin/celery"

# Where to chdir at start.
CELERYD_CHDIR="/var/www/fractal/parser-quicklook/"

# Python interpreter from environment, if using virtualenv
#ENV_PYTHON="/somewhere/.virtualenvs/MyProject/bin/python"

# How to call "manage.py celeryd_multi"
#CELERYD_MULTI="/usr/local/bin/celeryd-multi"

# How to call "manage.py celeryctl"
#CELERYCTL="/usr/local/bin/celeryctl"

#CELERYBEAT="/usr/local/bin/celerybeat"

# Extra arguments to celeryd
CELERYD_OPTS="--time-limit=300 --concurrency=8  -Q BBC,BGR,FASTCOMPANY,Firstpost,Guardian,IBNLIVE,LIVEMINT,Mashable,NDTV,Pandodaily,Reuters,TNW,TheHindu,ZEENEWS "

# Name of the celery config module, don't change this.
CELERY_CONFIG_MODULE="celeryconfig"

# %n will be replaced with the nodename.
CELERYD_LOG_FILE="/var/log/celery/%n.log"
CELERYD_PID_FILE="/var/run/celery/%n.pid"

# Workers should run as an unprivileged user.
#CELERYD_USER="nobody"
#CELERYD_GROUP="nobody"

# Set any other env vars here too!
PROJET_ENV="PRODUCTION"

# Name of the projects settings module.
# in this case is just settings and not the full path because it will change the dir to
# the project folder first.
CELERY_CREATE_DIRS=1

Celeryconfig已经在part1中提供。

这是我的项目目录结构。

project
|-- main.py
|-- project
|   |-- celeryconfig.py
|   |-- __init__.py
|-- tasks.py

如何使用队列进行守护进程? 我已经在CELERYD_OPTS中提供了队列。

有没有一种方法可以动态地将celery队列数量设置为守护进程?例如:我们有CELERY_CREATE_MISSING_QUEUES = True用于创建缺失的队列。是否有类似的方法来守护celery队列?


关于守护进程:请提出另一个问题。同时,请阅读文档http://celery.readthedocs.org/en/latest/tutorials/daemonizing.html。 - Capi Etheriel
我按照文档说明按要求设置好了一切。但是当我查看sudo service celeryd status时,它显示“celeryd未运行(无pidfile)”。我该如何解决? - Praful Bagai
5个回答

14

celery purge 应该足以清理Redis队列。但是,您的worker将具有其自己保留的任务并且在停止worker时会将它们发送回队列。因此,首先停止所有workers。然后运行celery purge


13

如果您有多个队列,celery purge 将清除默认队列。您可以指定要清除的队列,如下所示:

celery purge -A proj -Q queue1,queue2

现在已经发生了改变,正确的命令是 celery -A proj purge -Q queue1,queue2-A 不再作为子命令的参数支持。 - Navid Khan

2

从 Celery v5 开始,您现在应该使用:

celery -A proj purge -Q queue1,queue2

2

针对第一部分,有一个编程解决方案可以清除您的队列,更多文档可以在以下链接中找到:celery.app.control.purge文档

from celery import Celery

app = Celery()
app.control.purge()
#OR
app.control.discard_all()

这个不起作用,我收到一个错误消息:TypeError: 'Celery' object is not iterable - Robert Franklin

1

这将撤销所有可能的任务,而不终止任何进程。(如果要这样做,请在revoke调用中添加terminate=True,但自担风险。)

它需要一两秒钟才能运行,因此不适合高吞吐量代码。

from myapp.celery import app as celery_app


celery_app.control.purge()

i = celery_app.control.inspect()
# scheduled(): tasks with an ETA or countdown
# active():    tasks currently running - probably not revokable without terminate=True
# reserved():  enqueued tasks - usually revoked by purge() above
for queues in (i.active(), i.reserved(), i.scheduled()):
    for task_list in queues.values():
        for task in task_list:
            task_id = task.get("request", {}).get("id", None) or task.get("id", None)
            celery_app.control.revoke(task_id)

只需使用.purge()然后撤销.scheduled()可能会有相同的效果,老实说,我还没有进行过广泛的实验。但是仅清除操作将无法撤消在队列中设置了ETA或倒计时的任务。

感谢@kahlo的答案,这是本文的基础。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接