Celery、Celerybeat和Django-Celery-Beat能否在运行时动态添加/删除任务,而无需重新启动Celerybeat?

14

我尝试了所有能找到的方法,包括:

stackoverflow

如何动态添加/删除Celery(celerybeat)的定期任务

Celery celerybeat是否可以在运行时动态添加/删除任务

Github问题

如何动态地向Celerybeat添加或删除任务?

我从上面得到的结论是,如果我只使用Celery和Celery beat,我必须在添加/删除任务后重新启动Celery beat。但是,如果我使用django-celery-beat,则无需重新启动它。

我按照文档逐步操作:

from celery import Celery
from celery.schedules import crontab

app = Celery('tasks')
app.config_from_object('celeryconfig')
app.conf.timezone = 'UTC'

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

我的 celeryconfig

BROKER_URL = 'amqp://rabbit'
CELERY_RESULT_BACKEND = 'rpc://rabbit'
CELERY_RESULT_PERSISTENT = True
# CELERY_ACKS_LATE = True
CELERY_DEFAULT_DELIVERY_MODE = 2
CELERY_TASK_RESULT_EXPIRES = 3600
CELERYBEAT_SCHEDULER ="django_celery_beat.schedulers:DatabaseScheduler"

我的celery beat运行命令

celery -A tasks beat -l info -S django

这个很好用,任务按预期运行。之后,我编写了一个脚本在运行时添加任务。

import django
django.setup()
from tasks import app, setup_periodic_tasks
from django_celery_beat.models import PeriodicTask, CrontabSchedule


crontab = CrontabSchedule.objects.create(
       minute='*/1',
       hour='*',
       day_of_week='*',
   )

period = PeriodicTask.objects.create(
       name='testfasd',
       kwargs={},
       crontab=crontab,
       task='tasks.test',
   )

setup_periodic_tasks(app)

当我查看数据库时,我得到了我期望的结果,即有新的记录,并且last_update字段已经更新。而celery beat中的日志也证明了这一点。

[2016-12-20 17:37:21,796: INFO/MainProcess] Writing entries...
[2016-12-20 17:37:21,840: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.test)
[2016-12-20 17:37:31,848: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2016-12-20 17:37:31,851: INFO/MainProcess] Writing entries...
[2016-12-20 17:37:31,930: INFO/MainProcess] Scheduler: Sending due task add every 10 (tasks.test)

我的问题是,尽管celery beat知道数据库已更改,但它仍然发送旧任务,并且不会将新任务发送给工作进程。有任何想法吗?

更新

我在我的项目中使用docker,可能与此有关。


我也尝试了同样的操作,遇到了相同的错误。实际上,你和我并不孤单。在这里,你可以看到更多关于相关问题的评论:https://github.com/celery/django-celery-beat/issues/7 - Alexander Tyapkov
我认为这个库不是你想要的那种。这种问题通常涉及“生命周期”,意味着应用程序启动、使用配置,然后进入就绪状态。就绪状态是简单和一致的,因为它是不可变的。如果你添加和删除东西,会使工程变得困难(锁、信号量、这段代码是否可重入?这些哲学家吃什么?等等)。 - nsfyn55
谢谢nsfyn,所以我每次更改任务的时候唯一的方法就是重新启动它? - Windsooon
我不能确定地说,但看起来是这样。我在文档中没有看到任何迹象表明您所需的路径可以在不对celery的应用程序生命周期进行重大修改的情况下实现。 - nsfyn55
3个回答

1

这个github问题

[你无法在celerybeat中添加或删除任务],目前必须重新启动beat。

不可以。为了刷新celery[beat]内的任务或任务时间,你必须重新启动celery[beat]实例。任务在运行时加载到内存中。为了更改/添加任务,你必须刷新实例。

你可以考虑使用自我重复任务、自定义时间和条件执行。例如:

from datetime import timedelta
from celery import shared_task

@shared_task
def check_conditions():
    # Do some db-level code
    if condition:
        check_conditions.apply_async(eta=timedelta(hours=6))

我在生产环境中使用它,效果良好。
如果您需要重新加载任务,只需编程重启celery[beat]即可。
@shared_task
def autoreload():
    if condition:
        execute_shell_code_to_restart_celery()

我没有使用过这个东西,不能保证其可用性,但从理论上来说应该可以工作。 这个GitHub问题 我必须重新加载beat才能使这些更改在worker上更新...使用django-celery-beat...这个问题仍然存在于4.0.2和主分支上,在2016年12月21日进行了测试。

0

我曾经遇到和你一模一样的问题。 问题出在django-celery-beat包中使用了DatabaseScheduler,它的实现方式是,如果你想动态管理周期性任务(添加、删除、更新),而不必每次添加任务都重新启动celery beat,并且如果你愿意使用Redis,你可以使用这个库https://pypi.org/project/celery-redbeat/。 如果你想更好地控制你的任务,我也写了一个基于它的库,可以与Django原生配合使用https://pypi.org/project/django-redbeat/


0

警告

如果您更改了Django的TIME_ZONE设置,则您的定期任务计划仍将基于旧时区。

要解决这个问题,您需要重置每个定期任务的“上次运行时间”:

 from django_celery_beat.models import PeriodicTask, PeriodicTasks
 PeriodicTask.objects.all().update(last_run_at=None)
 for task in PeriodicTask.objects.all():
     PeriodicTasks.changed(task)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接