如何在Celery中设置回调函数

8
我希望给一个函数添加回调,这样当函数返回时可以调用一个常规的Python函数。
我的任务。
@celery.task                                                                                                                            
def add(x, y):                                                                                                                         
    return x + y 

How I want to use it:

from __future__ import print_function
delay.add(2, 3 ,callback=lambda x: print x) 

可以使用任何未在celery任务中定义但从任务调用的函数,而非lambda函数。

1个回答

5
在这种情况下,您只能链接任务:
add.apply_async((2, 3), link=other_task.s())

这与以下代码相同:

(add.s(2, 3) | other_task.s())()

等待任务完成会使任务变为同步,因此您想要的调用将等效于:

(lambda x: print(x))(add.delay(2, 3).get())

这将阻止当前进程,直到任务返回。如果您不想进程被阻止,则必须编写专用线程等待结果并调用回调函数。

或者您可以使用eventlet/gevent库编写普通代码。


2
回调函数一定要定义为任务吗?因为如果我有一个复杂的图,有很多必须声明为任务的依赖节点,那么所有应用程序逻辑都将在tasks.py中,这是不理想的... - Clara
我正在尝试在我的代码中使用回调函数send_email.apply_async( args = [json.dumps(payload)], exchange = CELERY_EXCHANGE, routing_key = CELERY_ROUTING_KEY, link = save_email_response.subtask((invite_id,candidate.id,)) ),但它没有调用save_email_response。 我已经使用rdb进行了验证。 - Hussain
尝试过 (send_email.s(args=[json.dumps(payload)]) | save_email_response.s(invite_id, candidate.id))(exchange=CELERY_EXCHANGE, routing_key=CELERY_ROUTING_KEY),但没有成功。 - Hussain

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接