如何在Celery中取消已经执行的任务?

130

我一直在阅读文档并搜索,但似乎找不到一个明确的答案:

您能取消已经执行的任务吗?(即任务已经开始运行,需要一段时间,但在其中途需要取消它)

我在Celery FAQ中找到了这个。

>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

但我不清楚这是否会取消排队的任务或者是杀死正在运行的工作进程。感谢您能提供的任何帮助!

9个回答

240

revoke指取消任务的执行。如果任务被取消,工作者将忽略该任务并不执行它。如果您没有使用持久的取消功能,在工作者重新启动后可能会执行您的任务。

https://docs.celeryq.dev/en/stable/userguide/workers.html#worker-persistent-revokes

revoke有一个终止选项,默认值为False。如果您需要终止正在执行的任务,则需要将终止选项设置为True

>>> from celery.task.control import revoke
>>> revoke(task_id, terminate=True)

https://docs.celeryq.dev/en/stable/userguide/workers.html#revoke-revoking-tasks


2
这在分布式环境中可行吗?我的意思是,如果我有多台机器上的工作人员正在执行任务。Celery是否会跟踪任务正在执行的机器? - ksrini
1
是的。与工作进程的通信是通过代理进行的。 - mher
6
result.revoke(terminate=True)应该与revoke(task_id, terminate=True)执行相同的操作。 - CamHart
16
根据最近的Celery文档,使用终止选项是“管理员的最后手段”。这会存在风险,可能会终止在该工作进程上最近启动的其他任务。 - kouk
2
它不起作用,>>> app.control.revoke(task_id)这个有效。 - A.Raouf
显示剩余4条评论

59
在Celery 3.1中,撤销任务的API已经更改。
根据Celery FAQ,您应该使用result.revoke:
>>> result = add.apply_async(args=[2, 2], countdown=120)
>>> result.revoke()

或者如果你只有任务 ID:

>>> from proj.celery import app
>>> app.control.revoke(task_id)

38

@0x00mh的回答是正确的,但是最近的Celery 文档指出,使用terminate选项是“管理员的最后手段”,因为您可能会意外终止另一个同时执行的任务。可能更好的解决方案是将terminate=Truesignal='SIGUSR1'相结合(这会导致在任务中引发SoftTimeLimitExceeded异常)。


4
这个解决方案对我非常有效。当我的任务触发了SoftTimeLimitExceeded异常时,我的自定义清理逻辑(通过try/except/finally实现)会被调用。在我看来,这比AbortableTask提供的功能要好得多(http://docs.celeryproject.org/en/latest/reference/celery.contrib.abortable.html)。使用后者需要一个数据库结果后端,并且您必须手动反复检查正在进行的任务的状态,以查看它是否已被中止。 - David Schneider
5
如果我的理解正确的话,这种方式有什么好处呢?似乎无论进程中是否有其他任务正在执行,它都会被停止,只不过抛出的异常可能不同罢了。 - marxin
如果我使用 worker_prefetch_multiplier = 1,因为我只有一些长时间运行的任务,终止应该是可以的 - 因为没有其他任务会受到影响 - 我理解得对吗?@spicyramen - maffe

8

根据5.2.3文档,可以运行以下命令:

    celery.control.revoke(task_id, terminate=True, signal='SIGKILL')

代码中的 celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) 用于创建并配置一个新的 Celery 实例,该实例将用于管理你的任务。

有关更多信息,请查看文档


6
你可以像这样定义一个带有代理和后端的 Celery 应用程序:

您可以使用以下代码定义一个具有代理和后端的Celery应用:

from celery import Celery
celeryapp = Celery('app', broker=redis_uri, backend=redis_uri)

当您运行发送任务时,它会返回任务的唯一标识符:
task_id = celeryapp.send_task('run.send_email', queue = "demo")

要取消任务,您需要使用Celery应用程序和任务ID:

celeryapp.control.revoke(task_id, terminate=True)

唯一的答案对我有效。 - C.K.

4

这似乎是使用“线程”池时最好的方法,因为 celery.control.revoke(task_id, terminate=True, signal='SIGKILL') 不起作用。 - geometrikal

2
from celery.app import default_app

revoked = default_app.control.revoke(task_id, terminated=True, signal='SIGKILL')
print(revoked)

目前你的回答写得不够清晰,请编辑并添加更多细节,以帮助其他人理解它如何回答所提出的问题。你可以在帮助中心找到更多关于如何撰写好答案的信息。 - JayPeerachai

1
请参考以下任务选项:time_limitsoft_time_limit(或您可以为工作者设置它)。如果您想控制执行时间以外的内容,请查看apply_async方法的expires参数。

0
from celery.result import AsyncResult
task = AsyncResult(task_id)
task.revoke()

2
感谢您对Stack Overflow社区做出贡献的兴趣。这个问题已经有很多答案了,其中包括一个经过社区广泛验证的答案。您确定您的方法之前没有被提到过吗?如果是这样的话,能否请您解释一下您的方法有何不同,什么情况下您的方法可能更好,并且/或者为什么您认为之前的答案不够满意。您能否请编辑您的答案并提供解释? - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接