我正在尝试开发一个概念证明,使用redis作为代理和celery进行异步任务链处理。
该程序是一个Flask API,具有/run和三个需要作为异步任务运行的函数。其中,从a()返回的结果是b()的参数,而从b()返回的结果是c()的参数,最终将数据通过一个名为“collection”的集合对象写入mongodb集合中。
该程序是一个Flask API,具有/run和三个需要作为异步任务运行的函数。其中,从a()返回的结果是b()的参数,而从b()返回的结果是c()的参数,最终将数据通过一个名为“collection”的集合对象写入mongodb集合中。
@celery.task
def a(param):
print("Original: {0}".format(param))
print("Inside Task 1")
param.update({"timestamp_A":str(datetime.timestamp), "result_A":True})
print(param)
return param
@celery.task
def b(param):
print("Inside Task 2")
param.update({"timestamp_B":str(datetime.timestamp), "result_B":True})
print(param)
return param
@celery.task
def c(param):
print("Inside Task 3")
collection.insert(dict(param))
print("Output Saved to DB")
@app.route('/run', methods = ['GET'])
def run():
if request.method != 'GET':
return "HTTP Method not allowed"
if request.method == 'GET':
T = 1000
for num in range(0, T):
ds = {"test": num}
chain(a.s(ds) | b.s() | c.s()).apply_async()
return "Process Complete"
if __name__ == '__main__':
app.run(debug=True)
使用上述代码,任务链就能正常工作,即a()会带着它的参数执行,但是要让函数b()执行,需要等待整个数据在a()中排队完成,然后才会执行b()。我希望只要任何一个任务a()被执行,它就应该立即交给b(),以此类推...有没有人能指出我可能犯了什么错误?