Celery - 链接远程回调

7
我有三个 Celery 任务,它们在三个不同的服务器上分别运行。
  • tasks.send_push_notification
  • tasks.send_sms
  • tasks.send_email
我想设置一个工作流程,如果发送推送通知失败,则尝试发送短信。如果发送短信失败,则应发送电子邮件。
如果这三个任务及其代码基础位于同一服务器上,我将遵循链接任务示例并执行类似操作。请参考:链接任务示例
from celery import chain
from tasks import send_push_notification, send_sms, send_email
import json
# some paylaod
payload = json.dumps({})
res = chain(
    send_push_notification.subtask(payload),
    send_sms.subtask(payload),
    send_email.subtask(payload)
)()

但是任务分布在3台不同的服务器上!

我已经尝试过

# 1
from celery import chain
from my_celery_app import app
res = chain(
    app.send_task('tasks.send_push_notification', payload),
    app.send_task('tasks.send_sms', payload),
    app.send_task('tasks.send_email', payload)
)()
# Which fails because I am chaining tasks not subtasks

并且

# 2
from celery import chain, subtask
res = chain(
    subtask('tasks.send_push_notification', payload), 
    subtask('tasks.send_sms', payload), 
    subtask('tasks.send_email', payload)
)()
# fails because I am not adding the tasks on the broker

这个怎么做呢?

更新: 我可以使用 link 而不是 chain 来实现。

from celery import subtask
res = app.send_task(
    'tasks.send_push_notification', (payload, ),
    link=subtask(
        'tasks.send_sms', (payload, ),
        link=subtask(
            'tasks.send_email', (payload, ),
        )
    )
)

这里涉及到很多嵌套。而且因为我实际上需要创建一个基于数据库的工作流程,所以用这种方式创建会变得很复杂。

1个回答

0

为什么不在你的任务中处理它呢?

def push_notification_task(payload):
    if not send_push_notification(payload):
        sms_notification_task.delay(payload)

def sms_notification_task(payload):
    if not send_sms_notification(payload):
        email_notification_task.delay(payload)

def email_notification_task(payload):
    send_email_notification(payload)

此外,chain 将按照给定的顺序执行所有任务,而您希望下一个任务仅在第一个任务失败时才运行。

谢谢回复。我使用chain是因为我想遵循“责任链”设计模式。你的解决方案行不通,因为这三个任务/它们的代码库不在一个地方。它们位于远程位置。 - Hussain
我的最终目标基本上是创建一个基于数据库/配置的工作流,用于推送通知、短信、电子邮件等操作。 - Hussain
无论任务的代码库在哪里,只要您的Celery工作者使用相同的Broker,这种方法就应该可以工作,因为“ .delay”将任务添加到broker队列中,而工作者将在完成当前任务后从队列中取出它。 - Muhammad Tahir

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接