Celery: 任务层次结构后的回调函数

10

我正在使用Celery从Web应用程序启动任务层次结构。

任务

我使用以下任务:

  • task_a
  • task_b
  • task_c
  • notify_user

一个Django视图启动多个task_a实例。每个实例都进行一些处理,然后启动多个task_b实例,每个实例也进行一些处理,然后启动多个task_c实例。

可视化如下:

Tree

目标

我的目标是执行所有任务,并在整个层次结构完成时立即运行回调函数。此外,我希望能够将数据从最低级别的任务传递到顶层。

  1. 视图只需“启动”任务,然后返回。
  2. 每个子任务依赖于父任务,而父任务不直接依赖于子任务。 父任务启动了所有子任务后,它可以停止。
  3. 可以并行处理所有内容,只要在启动子任务之前运行父任务即可。
  4. 所有任务完成后,应调用notify_user回调函数。
  5. notify_user回调函数需要访问task_c的数据。

所有任务都应该是非阻塞的,因此task_b不应等待所有task_c子任务完成。

实现上述目标的正确方法是什么?

2个回答

4
解决方案是在这次拉取请求中提供的动态任务功能:https://github.com/celery/celery/pull/817。使用此功能,每个任务都可以返回一组子任务,然后这些子任务将替换队列中的原始任务。

Danilo提到的动态任务功能非常强大。虽然它没有被纳入当前(3.0.21)版本的Celery中,但可以轻松地添加到您的本地Celery部署中。 - Chris Johnson
那个 PR 看起来好像从未合并过,并且 Celery 文档中没有提到“动态”。 - odigity

2
假设您有以下任务:
celery = Celery(
    broker="amqp://test:test@localhost:5672/test"
)
celery.conf.update(
    CELERY_RESULT_BACKEND = "mongodb",
)


@celery.task
def task_a(result):
    print 'task_a:', result
    return result

@celery.task
def task_b(result):
    print 'task_b:', result
    return result

@celery.task
def task_c(result):
    print 'task_c:', result
    return result

@celery.task
def notify_user(result):
    print result
    return result

对于给定的输入数据(如您所绘制的):

    tree = [
        [["C1", "C2", "C3"], ["C4", "C5"]], [["C6", "C7", "C8"], ["C9"]]
    ]

您可以做以下事情:

    a_group = []
    for ia, a in enumerate(tree):
        print "A%s:" % ia
        b_group = []
        for ib, b in enumerate(a):
            print " - B%s:" % ib
            for c in b:
                print '   -', c

            c_group = group([task_c.s(c) for c in b])

            b_group.append(c_group | task_b.s())

        a_group.append(group(b_group) | task_a.s())

    final_task = group(a_group) | notify_user.s()

它的表示形式是(不要读它,很丑 :))
[[[__main__.task_c('C1'), __main__.task_c('C2'), __main__.task_c('C3')] | __main__.task_b(), [__main__.task_c('C4'), __main__.task_c('C5')] | __main__.task_b()] | __main__.task_a(), [[__main__.task_c('C6'), __main__.task_c('C7'), __main__.task_c('C8')] | __main__.task_b(), [__main__.task_c('C9')] | __main__.task_b()] | __main__.task_a()] | __main__.notify_user()

传递给notify_user的数据将是:

[[['C1', 'C2', 'C3'], ['C4', 'C5']], [['C6', 'C7', 'C8'], ['C9']]]

所有的操作都是通过回调函数(chords)运行,因此没有任务等待其他任务。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接