将任务的结果传递给Celery中的map函数

3
我觉得这是一个相当简单的用例,但我在文档中没有成功解决它。
以下是我的问题的虚构版本:
我有一个任务,接受一个参数并返回一个列表。我想对列表中的每个元素应用另一个任务。然后(根据情况)可能希望继续使用其他任务进行处理。
chain(
    my.tasks.divide.s(data),
    my.tasks.conquer.map(),
    ).apply_async().get()

chain(
    my.tasks.divide.s(data),
    my.tasks.conquer.map(),
    my.tasks.unite.s(),
    my.tasks.rule.s(),
    ).apply_async().get()

地图不是这样工作的,但我觉得它应该是这样的!我只想从一个参数链到一个列表,然后再返回...这肯定不难以正确地完成吧?

“解决方案” 这里这里 看起来过于复杂,我不相信它们是最佳实践。

文档 关于chords、maps和chains并没有帮助解决这个特定的问题。

编辑:到目前为止,我所做的是:问题在于我必须调用.apply_async().get()两次才能获得实际结果,因为中间步骤返回的是一个chord而不是实际结果:

@app.task()
def divide(item):
    return [x for x in item]

@app.task()
def conquer(item):
    return item.upper()

@app.task()
def rule(items):
    return "".join(items)

@app.task()
def conquer_group(items):
    return group([conquer.s(x) for x in items])

@app.task()
def rule_group(items):
    return chord([conquer.s(x) for x in items], rule.s())

def main():
    print chain(
        divide.s("foobarbaz"),
        rule_group.s(),
        ).apply_async().get().apply_async().get()

你尝试过组合函数吗? - Maor Refaeli
1个回答

1
问题在于map()需要一个可迭代的对象来进行实例化。 这就是为什么可以使用(sample.map(range(100)) | another_task.s()).apply_async(),但无法实例化接受链式任务结果的map()签名。 我认为最简单的方法是在任务内部实例化map()签名。
@app.task()
def divide(item):
    return [x for x in item]

@app.task()
def conquer(item):
    return item.upper()

@app.task()
def process(items):
    return conquer.map(items)

def run():
    (divide.s("foobar") | process.s()).apply_async()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接