我一直在阅读文档并搜索,但似乎找不到一个明确的答案:
您能取消已经执行的任务吗?(即任务已经开始运行,需要一段时间,但在其中途需要取消它)
我在Celery FAQ中找到了这个。
>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()
但我不清楚这是否会取消排队的任务或者是杀死正在运行的工作进程。感谢您能提供的任何帮助!
我一直在阅读文档并搜索,但似乎找不到一个明确的答案:
您能取消已经执行的任务吗?(即任务已经开始运行,需要一段时间,但在其中途需要取消它)
我在Celery FAQ中找到了这个。
>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()
但我不清楚这是否会取消排队的任务或者是杀死正在运行的工作进程。感谢您能提供的任何帮助!
revoke指取消任务的执行。如果任务被取消,工作者将忽略该任务并不执行它。如果您没有使用持久的取消功能,在工作者重新启动后可能会执行您的任务。
https://docs.celeryq.dev/en/stable/userguide/workers.html#worker-persistent-revokes
revoke有一个终止选项,默认值为False。如果您需要终止正在执行的任务,则需要将终止选项设置为True。
>>> from celery.task.control import revoke
>>> revoke(task_id, terminate=True)
https://docs.celeryq.dev/en/stable/userguide/workers.html#revoke-revoking-tasks
>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()
或者如果你只有任务 ID:
>>> from proj.celery import app
>>> app.control.revoke(task_id)
@0x00mh的回答是正确的,但是最近的Celery 文档指出,使用terminate
选项是“管理员的最后手段”,因为您可能会意外终止另一个同时执行的任务。可能更好的解决方案是将terminate=True
与signal='SIGUSR1'
相结合(这会导致在任务中引发SoftTimeLimitExceeded异常)。
SoftTimeLimitExceeded
异常时,我的自定义清理逻辑(通过try
/except
/finally
实现)会被调用。在我看来,这比AbortableTask
提供的功能要好得多(http://docs.celeryproject.org/en/latest/reference/celery.contrib.abortable.html)。使用后者需要一个数据库结果后端,并且您必须手动反复检查正在进行的任务的状态,以查看它是否已被中止。 - David Schneiderworker_prefetch_multiplier = 1
,因为我只有一些长时间运行的任务,终止应该是可以的 - 因为没有其他任务会受到影响 - 我理解得对吗?@spicyramen - maffe根据5.2.3文档,可以运行以下命令:
celery.control.revoke(task_id, terminate=True, signal='SIGKILL')
代码中的 celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
用于创建并配置一个新的 Celery 实例,该实例将用于管理你的任务。
有关更多信息,请查看文档。
您可以使用以下代码定义一个具有代理和后端的Celery应用:
from celery import Celery
celeryapp = Celery('app', broker=redis_uri, backend=redis_uri)
task_id = celeryapp.send_task('run.send_email', queue = "demo")
要取消任务,您需要使用Celery应用程序和任务ID:
celeryapp.control.revoke(task_id, terminate=True)
此外,还有另一种方法(中止任务)来停止任务,但存在很多不可靠性。了解更多详细信息,请参见: http://docs.celeryproject.org/en/latest/reference/celery.contrib.abortable.html
from celery.app import default_app
revoked = default_app.control.revoke(task_id, terminated=True, signal='SIGKILL')
print(revoked)
from celery.result import AsyncResult
task = AsyncResult(task_id)
task.revoke()
>>> app.control.revoke(task_id)
这个有效。 - A.Raouf