Celery: 在关联的任务执行完后启动 chord 回调函数

3
当我启动一个包含一组任务和回调函数的chord()列表时,只有在所有任务组完成后,包括当前不在这个chord中的任务,才会调用回调函数。
以下是更好地解释此问题的代码:
import time

from celery import Celery, group, chord

app = Celery('tasks')
app.config_from_object('celeryconfig')

@app.task(name='SHORT_TASK')
def short_task(t):
    time.sleep(t)
    return t

@app.task(name='FINISH_GROUP')
def finish_group(res, nb):
    print("Pipe #{} finished".format(nb))
    return True

@app.task
def main(total):
    tasks = []
    for nb in range(1, total+1):
        short_tasks = [short_task.si(i) for i in [0.5, 0.75, 1]]

        chord(
            group(short_tasks),
            finish_group.s(nb)
        ).apply_async()

举个例子,我使用了5个项目来启动它:

In [5]: main.delay(5)
Out[5]: <AsyncResult: db1f97f0-ff7a-4651-b2f9-11e27a001480>

结果如下:
[2017-11-06 13:50:38,374: INFO/MainProcess] Received task: tasks.main[6da738b5-4eae-4de4-9ac5-1dc67d210f1d]  
[2017-11-06 13:50:38,409: INFO/MainProcess] Received task: SHORT_TASK[9581f9e0-1128-4b87-ae6b-16f238b2337e]  
[2017-11-06 13:50:38,411: INFO/MainProcess] Received task: SHORT_TASK[579dc498-3770-4385-a25a-06173fbe639c]  
[2017-11-06 13:50:38,412: INFO/MainProcess] Received task: SHORT_TASK[bfafb943-46d8-42e3-941f-b48a9c8e0186]  
[2017-11-06 13:50:38,414: INFO/MainProcess] Received task: SHORT_TASK[a1208f06-250f-48ac-b3df-45c4525fe8eb]  
[2017-11-06 13:50:38,416: INFO/MainProcess] Received task: SHORT_TASK[86ee7408-9d61-4909-bce8-c42cf691e9c2]  
[2017-11-06 13:50:38,416: INFO/MainProcess] Received task: SHORT_TASK[e2bb22c0-1d20-4da7-91d9-45b7ed8bfc6f]  
[2017-11-06 13:50:38,419: INFO/MainProcess] Received task: SHORT_TASK[7551199b-4690-45dd-a434-3911861f0093]  
[2017-11-06 13:50:38,420: INFO/MainProcess] Received task: SHORT_TASK[362d18f4-2252-4a31-ad21-4a2d192fd22e]  
[2017-11-06 13:50:38,421: INFO/MainProcess] Received task: SHORT_TASK[7561c33b-7020-4feb-b054-3919e4ae31c2]  
[2017-11-06 13:50:38,423: INFO/MainProcess] Received task: SHORT_TASK[3ac997f5-6d0f-43b6-ab15-a6827a26665f]  
[2017-11-06 13:50:38,423: INFO/MainProcess] Received task: SHORT_TASK[8b2ebb3a-293c-4bb8-88a3-5242750a082e]  
[2017-11-06 13:50:38,423: INFO/ForkPoolWorker-3] Task tasks.main[6da738b5-4eae-4de4-9ac5-1dc67d210f1d] succeeded in 0.048569423001026735s: None
[2017-11-06 13:50:38,424: INFO/MainProcess] Received task: SHORT_TASK[efd68688-ec4a-418d-83b9-a55fd6cc1541]  
[2017-11-06 13:50:38,427: INFO/MainProcess] Received task: SHORT_TASK[4e82540e-f935-4288-828f-c6f66f84139a]  
[2017-11-06 13:50:38,427: INFO/MainProcess] Received task: SHORT_TASK[a94e4ec4-adcb-4a0f-b184-b36650105ed5]  
[2017-11-06 13:50:38,427: INFO/MainProcess] Received task: SHORT_TASK[0d4b5e24-7aaa-4eb8-8e54-70e769dfdb39]  
[2017-11-06 13:50:38,918: INFO/ForkPoolWorker-2] Task SHORT_TASK[9581f9e0-1128-4b87-ae6b-16f238b2337e] succeeded in 0.5051485379808582s: 0.5
[2017-11-06 13:50:38,926: INFO/ForkPoolWorker-3] Task SHORT_TASK[a1208f06-250f-48ac-b3df-45c4525fe8eb] succeeded in 0.5012409449846018s: 0.5
[2017-11-06 13:50:39,165: INFO/ForkPoolWorker-1] Task SHORT_TASK[579dc498-3770-4385-a25a-06173fbe639c] succeeded in 0.7524393269850407s: 0.75
[2017-11-06 13:50:39,445: INFO/ForkPoolWorker-4] Task SHORT_TASK[bfafb943-46d8-42e3-941f-b48a9c8e0186] succeeded in 1.031865488999756s: 1
[2017-11-06 13:50:39,448: INFO/MainProcess] Received task: FINISH_GROUP[4506631f-f9cc-4e9e-a9e7-9a59c8f7c998]  
[2017-11-06 13:50:39,668: INFO/ForkPoolWorker-1] Task SHORT_TASK[7551199b-4690-45dd-a434-3911861f0093] succeeded in 0.501304400007939s: 0.5
[2017-11-06 13:50:39,672: INFO/ForkPoolWorker-2] Task SHORT_TASK[86ee7408-9d61-4909-bce8-c42cf691e9c2] succeeded in 0.7513346789928619s: 0.75
[2017-11-06 13:50:39,932: INFO/ForkPoolWorker-3] Task SHORT_TASK[e2bb22c0-1d20-4da7-91d9-45b7ed8bfc6f] succeeded in 1.0058077470166609s: 1
[2017-11-06 13:50:39,936: INFO/MainProcess] Received task: FINISH_GROUP[f143d581-799b-45ff-9e11-edc0bb88006a]  
[2017-11-06 13:50:40,175: INFO/ForkPoolWorker-2] Task SHORT_TASK[3ac997f5-6d0f-43b6-ab15-a6827a26665f] succeeded in 0.502920284983702s: 0.5
[2017-11-06 13:50:40,198: INFO/ForkPoolWorker-4] Task SHORT_TASK[362d18f4-2252-4a31-ad21-4a2d192fd22e] succeeded in 0.752579735009931s: 0.75
[2017-11-06 13:50:40,685: INFO/ForkPoolWorker-3] Task SHORT_TASK[8b2ebb3a-293c-4bb8-88a3-5242750a082e] succeeded in 0.7518302960088477s: 0.75
[2017-11-06 13:50:40,701: INFO/ForkPoolWorker-4] Task SHORT_TASK[4e82540e-f935-4288-828f-c6f66f84139a] succeeded in 0.5013290829956532s: 0.5
[2017-11-06 13:50:40,715: INFO/ForkPoolWorker-1] Task SHORT_TASK[7561c33b-7020-4feb-b054-3919e4ae31c2] succeeded in 1.0464465210097842s: 1
[2017-11-06 13:50:40,715: WARNING/ForkPoolWorker-1] Pipe #1 finished
[2017-11-06 13:50:40,716: INFO/ForkPoolWorker-1] Task FINISH_GROUP[4506631f-f9cc-4e9e-a9e7-9a59c8f7c998] succeeded in 0.000513697013957426s: True
[2017-11-06 13:50:40,716: WARNING/ForkPoolWorker-1] Pipe #2 finished
[2017-11-06 13:50:40,717: INFO/ForkPoolWorker-1] Task FINISH_GROUP[f143d581-799b-45ff-9e11-edc0bb88006a] succeeded in 0.0003622350050136447s: True
[2017-11-06 13:50:40,718: INFO/MainProcess] Received task: FINISH_GROUP[fc9be8c2-99f7-46b2-a810-47023e0a072a]  
[2017-11-06 13:50:40,718: WARNING/ForkPoolWorker-1] Pipe #3 finished
[2017-11-06 13:50:40,718: INFO/ForkPoolWorker-1] Task FINISH_GROUP[fc9be8c2-99f7-46b2-a810-47023e0a072a] succeeded in 0.00038264598697423935s: True
[2017-11-06 13:50:41,215: INFO/ForkPoolWorker-2] Task SHORT_TASK[efd68688-ec4a-418d-83b9-a55fd6cc1541] succeeded in 1.0379863310081419s: 1
[2017-11-06 13:50:41,219: INFO/MainProcess] Received task: FINISH_GROUP[6a4dc66e-2232-4bad-9d85-9fbc63b8b847]  
[2017-11-06 13:50:41,221: WARNING/ForkPoolWorker-2] Pipe #4 finished
[2017-11-06 13:50:41,222: INFO/ForkPoolWorker-2] Task FINISH_GROUP[6a4dc66e-2232-4bad-9d85-9fbc63b8b847] succeeded in 0.0018843600118998438s: True
[2017-11-06 13:50:41,440: INFO/ForkPoolWorker-3] Task SHORT_TASK[a94e4ec4-adcb-4a0f-b184-b36650105ed5] succeeded in 0.7531412789830938s: 0.75
[2017-11-06 13:50:41,708: INFO/ForkPoolWorker-4] Task SHORT_TASK[0d4b5e24-7aaa-4eb8-8e54-70e769dfdb39] succeeded in 1.005872479028767s: 1
[2017-11-06 13:50:41,711: INFO/MainProcess] Received task: FINISH_GROUP[388ee1c3-b80c-41af-bbfd-29b968e90aff]  
[2017-11-06 13:50:41,712: WARNING/ForkPoolWorker-3] Pipe #5 finished
[2017-11-06 13:50:41,712: INFO/ForkPoolWorker-3] Task FINISH_GROUP[388ee1c3-b80c-41af-bbfd-29b968e90aff] succeeded in 0.0005500270053744316s: True

我启动了一个带有并发性的单一 Celery: 4 (预派生)。

我们可以在开始时看到收到了15个 SHORT_TASK,然后worker执行它,仅在此之后才调用 FINISH_GROUP 任务。

请问是否可以在与他们相关的 SHORT_TASK 完成后立即启动 FINISH_GROUP 任务,而不是等待所有其他不相关的 SHORT_TASK 完成?

也许我的画布不正确,或者这是一个错误的Celery配置,我不知道。

感谢您的帮助!

1个回答

4
你的测试结果有偏差,因为你只使用了一个 worker,time.sleep() 会阻塞该 worker。这意味着即使并发数为4,该 worker 也不再处理其他任务。

是否可能在相关的 SHORT_TASK 完成后立即启动 FINISH_GROUP 任务,而不是等待所有其他无关的 SHORT_TASK?

目前你没有等待其他 short_task 完成,它们都被安排在同一时间执行。由于使用了 sleep,finish_group 将在其相应 chord 的所有 short_task 结束后被调用。
你当前的执行情况如下:
| chord 1      | chord 2      | chord 3      |
|--------------|--------------|--------------|
| short_task 1 |              |              |      |
|              | short_task 1 |              |      |
|              |              | short_task 1 |      |
| short_task 2 |              |              |      |
|              | short_task 2 |              |      |
|              |              | short_task 2 |      |
| short_task 3 |              |              |      v
|              | short_task 3 |              | execution order
|              |              | short_task 3 |
| finish_group |              |              |
|              | finish_group |              |
|              |              | finish_group |

如果你移除了sleep,增加了更多的worker或者使用gevent,它应该看起来像这样:
| chord 1          | chord 2          | chord 3          |
|------------------|------------------|------------- ----|
| short_task 1,2,3 | short_task 1,2,3 | short_task 1,2,3 |
| finish_group     | finish_group     | finish_group     |

你应该会看到在日志中,同一行的任务会以稍微不同的顺序出现(取决于哪个工作进程先接受了它),但finish_group总是最后一个。


请注意,在使用chord时,对任务进行分组是不必要的。

chord(
    short_tasks,
    finish_group.s(nb)
)

使用gevent相同的代码:

import gevent
from celery import Celery, group, chord, chain

app = Celery('tasks', broker='redis://localhost/4', backend='redis://localhost/5')


@app.task()
def short_task(nb, i):
    print('TEST: start short_task({}, {})'.format(nb, i))
    gevent.sleep(1)
    print('TEST: end   short_task({}, {})'.format(nb, i))
    return i


@app.task(name='FINISH_GROUP')
def finish_group(results, nb):
    print('TEST: finish_group({}) -> {}'.format(nb, results))


@app.task
def main(total):
    for nb in range(1, total+1):
        short_tasks = [short_task.si(nb, i) for i in range(3)]

        chord(short_tasks, finish_group.s(nb)).apply_async()

使用以下命令启动:

$ celery worker -A celery_test --loglevel=debug --concurrency=20 -P gevent 2>&1 | grep TEST

执行时并行会导致输出结果混乱。
[2017-11-06 16:40:08,085] TEST: start short_task(1, 0)
[2017-11-06 16:40:08,088] TEST: start short_task(1, 1)
[2017-11-06 16:40:08,091] TEST: start short_task(1, 2)
[2017-11-06 16:40:08,092] TEST: start short_task(2, 0)
[2017-11-06 16:40:08,094] TEST: start short_task(2, 1)
[2017-11-06 16:40:08,096] TEST: start short_task(2, 2)
[2017-11-06 16:40:08,100] TEST: start short_task(3, 0)
[2017-11-06 16:40:08,101] TEST: start short_task(3, 1)
[2017-11-06 16:40:08,103] TEST: start short_task(3, 2)
# ^ all short_task have been started at the same time

[2017-11-06 16:40:09,085] TEST: end   short_task(1, 0)
[2017-11-06 16:40:09,089] TEST: end   short_task(1, 1)
[2017-11-06 16:40:09,093] TEST: end   short_task(1, 2)
[2017-11-06 16:40:09,106] TEST: end   short_task(2, 0)
[2017-11-06 16:40:09,106] TEST: end   short_task(2, 1)
[2017-11-06 16:40:09,107] TEST: end   short_task(2, 2)
[2017-11-06 16:40:09,107] TEST: end   short_task(3, 0)
[2017-11-06 16:40:09,108] TEST: end   short_task(3, 1)
[2017-11-06 16:40:09,108] TEST: end   short_task(3, 2)
# ^ total execution is only 1 second since 9 greenlet have slept together

[2017-11-06 16:40:09,115] TEST: finish_group(1) -> [0, 1, 2]
[2017-11-06 16:40:09,126] TEST: finish_group(2) -> [2, 1, 0]
[2017-11-06 16:40:09,128] TEST: finish_group(3) -> [0, 1, 2]
# ^ order of results are mixed depending of which greenlet finished first

ref,如何链接多个和弦组,https://dev59.com/WB_9s4cB2Jgan1znmYWI#51097747 - Charlie 木匠

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接