Celery - 异步任务链式调用

3
我正在尝试开发一个概念证明,使用redis作为代理和celery进行异步任务链处理。
该程序是一个Flask API,具有/run和三个需要作为异步任务运行的函数。其中,从a()返回的结果是b()的参数,而从b()返回的结果是c()的参数,最终将数据通过一个名为“collection”的集合对象写入mongodb集合中。
@celery.task
def a(param):
    print("Original: {0}".format(param))
    print("Inside Task 1")
    param.update({"timestamp_A":str(datetime.timestamp), "result_A":True})
    print(param)
    return param

@celery.task
def b(param):
    print("Inside Task 2")
    param.update({"timestamp_B":str(datetime.timestamp), "result_B":True})
    print(param)
    return param

@celery.task
def c(param):
    print("Inside Task 3")
    collection.insert(dict(param))
    print("Output Saved to DB")


@app.route('/run', methods = ['GET'])
def run():
    if request.method != 'GET':
        return "HTTP Method not allowed"

    if request.method == 'GET':
        T = 1000
        for num in range(0, T):
            ds = {"test": num}
            chain(a.s(ds) | b.s() | c.s()).apply_async()
        return "Process Complete"

if __name__ == '__main__':
    app.run(debug=True)

使用上述代码,任务链就能正常工作,即a()会带着它的参数执行,但是要让函数b()执行,需要等待整个数据在a()中排队完成,然后才会执行b()。我希望只要任何一个任务a()被执行,它就应该立即交给b(),以此类推...有没有人能指出我可能犯了什么错误?

2个回答

1
我可能漏掉了什么,但似乎最简单的方法就是在上一个任务结束时调用下一个任务。
@celery.task
def a(arg):
  ret = calc(arg)
  b.apply_async(ret)

@celery.task
def b(arg):
  ret = calc(arg)
  c.apply_async(ret)

@celery.task
def c(arg):
  ret = calc(arg)
  mongo.store(ret)

这并不允许你在循环中有时调用a,有时不调用a,但你可以将任务包装在外部任务中,以同步方式运行内部部分。

从后台作业中生成后台任务是正常的吗?虽然它似乎可以控制流程,但如果b()是长时间运行的任务,它将绑定运行任务a()的进程ID,c()也是如此。这是否意味着当任务a完成时,Celery无法发出完成信号并继续占用其资源? - Shubham Mishra
不,如果在任务内部调用apply_async(),它与不调用的效果相同-它会向任务队列发送一条消息,以安排在下一个可用工作进程上调度作业。这意味着如果您依赖于celery结果api来跟踪'c'的完成情况,则无法使用'a'的结果,您必须传回生成的任务的任务ID。(但是,我也从未使用过结果api,而是依赖于数据存储中任务生成的日志和信号) - Paul Becotte
以上的步骤对我有效,并解决了所提出的问题。 - Shubham Mishra

1
根据您提供的描述,看起来您应该使用。链可以将任务分组在一起,将每个任务的返回值传递给链中的下一个任务,实现您所要求的功能。

嘿@MrName,没错!正如您在代码中所看到的,我已经使用了链。我遇到的问题是无法控制哪个任务以何种顺序执行。 例如,对于函数a(),会生成三个带有id a1、a2、a3等的任务,并且可能会生成多达a20个任务,我需要任务控制来为函数b()生成一个b2任务,如果任务a2成功,则如何理解控制流程? - Shubham Mishra
啊,我明白了,那很合理。抱歉,我没注意到你代码中的“chain”调用。好问题,我会尽力想出有用的回答。 - MrName

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接