如何在Celery中安排一系列任务的计划?

5

我希望能运行一个由beat计划的复杂任务。假设默认的add/mul任务已经定义好。

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(
        crontab(),
        add.s(2,3) | mul.s(2)
    )

但是在工作进程中这会返回一个错误:
NotImplementedError: chain is not a real task

如何使用celery beat调度非平凡任务?


我建议您使用beat配置并安排任务以按照您所喜欢的方式运行。另外,将beat作为独立的服务使用。 - DejanLekic
我已经使用了一个节拍计划(如上所述)。但是任务必须依赖于先前的结果,因此它是一个链式任务。然而,当我提供一个带有上述错误的链式任务时,Celery Beat会失败。 - Serge3
3
我会把那个链条放进一个任务中,并添加到beatconfig中。 - DejanLekic
1
按照@DejanLekic的建议,将链(或我的情况下的一组链)包装在任务中对我起了作用。唯一需要做的额外事情是在任务定义内部使用.delay()实际调用该组。如果没有这个,主任务会被成功安排,但链接的任务永远不会被触发。在 Celery 5.1.2 中测试通过。 - howaryoo
2个回答

6
一种方法是在您的 celeryconfig 中的 beat_schedule 中安排任务链,使用link选项,这里的celery_tasks是定义您的任务的模块名称。
from celery.schedules import crontab
from celery import signature

beat_schedule = {
    'chained': {
        'task': 'celery_tasks.add',
        'schedule': crontab(),
        'options': {
            'queue': 'default',
            'link': signature('celery_tasks.mul',
                        args=(),
                        kwargs={},
                        options={
                            'link': signature('celery_tasks.another_task', 
                                args=(),
                                kwargs={}, 
                                queue='default')
                        },
                        queue='default')
            },
         'args': ()
    }
}

我现在已经尝试过了。链接任务被接受,但工作进程没有执行。节拍服务状态为[2019-06-06 13:22:00,034: INFO/MainProcess] Scheduler: Sending due task chained (tasks.add),但工作进程根本没有反应。此外,我能够链接多个任务吗?最终我需要4个链接任务(对象加载器,dmap(分发到组),load_data,process_data)。 - Serge3
要运行更多的链接任务,您可以在先前的链接任务中的“选项”中添加“link”属性。我已经更新了我的答案来演示这一点。 - Greenev
不用客气!请注意,我最终在生产环境中使用了 Cron 来运行定期任务,因为我无法强制 celery-beat 正常工作,但也许在最新版本的 Celery 中已经修复了这个问题。 - Greenev
根据最新的文档(5.1.2): 实际上,链接执行选项被认为是一个内部原语,您可能不会直接使用它,而是使用链代替。 链接(回调/错误回调) - howaryoo
1
好的发现!原始解决方案是为celery 4.x设计的,因此可能有更好的解决方案适用于最新版本。 - Greenev
显示剩余2条评论

0

如果要添加链接的定期任务,您可以在声明链时使用 @app.task,然后将此新任务添加到 add_periodic_task() 方法中。例如:

@app.on_after_finalize.connect ->i use this because it`s declared on task.py
def setup_periodic_tasks(sender, **kwargs):                               
   sender.add_periodic_task(timedelta(minutes=10), chian_st22.s(),name='test')
                                

@app.task
def chian_st22(): -> i create the task with chain 
    cadena = chain(st22.s(), mailer.s()).apply_async()

@app.task
def mailer(data):
    clase = CheckAlert()
    mail = clase.envio_mail(data)
    return mail

@app.task
def st22():
    clase = CheckAlert()
    st = clase.check_st22_dumps()
    return st

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接