Celery apply_async 的回调函数

15
我在我的应用中使用celery来运行定期任务。下面是一个简单的示例。
from myqueue import Queue
@perodic_task(run_every=timedelta(minutes=1))
def process_queue():
    queue = Queue()
    uid, questions = queue.pop()
    if uid is None:
        return

    job = group(do_stuff(q) for q in questions)
    job.apply_async()

def do_stuff(question):
    try:
        ...
    except:
        ...
        raise
正如您在上面的示例中看到的那样,我使用celery来运行异步任务,但(由于它是一个队列),在do_stuff发生异常时需要执行queue.fail(uid),否则需要执行queue.ack(uid)。在这种情况下,在任务成功和失败的情况下都有一些回调将非常清晰和有用。文档中提到了一些内容,但从未见过使用apply_async进行回调的实践。这可行吗?
2个回答

49

创建Task类的子类并重载on_success和on_failure函数:

from celery import Task


class CallbackTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        '''
        retval – The return value of the task.
        task_id – Unique id of the executed task.
        args – Original arguments for the executed task.
        kwargs – Original keyword arguments for the executed task.
        '''
        pass
        
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        '''
        exc – The exception raised by the task.
        task_id – Unique id of the failed task.
        args – Original arguments for the task that failed.
        kwargs – Original keyword arguments for the task that failed.
        '''
        pass

使用:

@celery.task(base=CallbackTask)  # this does the trick
def add(x, y):
    return x + y

1
如何像args和kwargs一样将参数传递给on_success或on_failure? - iamabhaykmr
1
这不会在任务调度程序端回调。 - Roby
那么,一旦Celery工作进程成功完成,它会调用on_success方法吗? - sattva_venu
@sattva_venu 是的,请确保重新启动你的 celery 进程。 - Rakka Alhazimi

8

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接