我能在使用任务装饰器创建的Celery任务中添加一个on_failure回调函数吗?

17

我相当确定只能通过创建自己的任务类来完成这个操作,但我想知道是否有其他人找到了其他方法。

2个回答

25

以下是完整的解决方案(适用于Celery 4+):

import celery
from celery.task import task

class MyBaseClassForTask(celery.Task):

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        # exc (Exception) - The exception raised by the task.
        # args (Tuple) - Original arguments for the task that failed.
        # kwargs (Dict) - Original keyword arguments for the task that failed.
        print('{0!r} failed: {1!r}'.format(task_id, exc))

@task(name="foo:my_task", base=MyBaseClassForTask)
def add(x, y):
    raise KeyError()

资源:


这个 on_failure 回调函数是在任务失败时被调用,还是只有所有重试都失败时才会被调用? - Michael Pacheco
IDE抱怨未实现的抽象方法run(),我能直接让它保持未实现状态吗? - Michael Pacheco
是的,但我还需要创建一个异常(并通知)如果celery工作服务器宕机。 - kta

15
您可以直接将函数提供给装饰器:
def fun(self, exc, task_id, args, kwargs, einfo):
    print('Failed!')

@task(name="foo:my_task", on_failure=fun)
def add(x, y):
    raise KeyError()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接