Celery任务调度(Celery,Django和RabbitMQ)

10

我希望有一个任务每5分钟执行一次,但它会等待上一次执行完成后才开始计时这5分钟。(这样我也可以确保只有一个任务在运行)我找到的最简单的方法是运行Django应用程序manage.py shell并运行以下命令:

while True:
    result = task.delay()
    result.wait()
    sleep(5)

但是对于我想要以这种方式执行的每个任务,我都必须运行它自己的shell,有没有简单的方法可以做到这一点?也许可以使用自定义定时程序或Django Celery调度程序?

6个回答

18

哇,令人惊讶的是没有人理解这个人的问题。他们不是询问如何定期运行任务,而是如何确保 Celery 不同时运行相同任务的两个实例。我不认为有一种直接使用 Celery 实现这个目标的方法,但你可以让其中一个任务在开始时获取锁,并如果获取失败,则在几秒钟后重试(使用 retry)。任务在返回之前将释放锁;如果它崩溃或超时,您可以使锁自动过期几分钟。

对于锁定,您可能只需使用数据库或类似 Redis 的东西。


+1. 唯一解决独特实例问题的人!如果您正在使用Django数据库,可以在此处找到有关如何实现锁定的详细信息:https://dev59.com/dm855IYBdhLWcg3w5Igb - jcdude

16

您可能会对这种更简单的方法感兴趣,它不需要对 celery 配置进行任何更改。

@celery.decorators.periodic_task(run_every=datetime.timedelta(minutes=5))
def my_task():
    # Insert fun-stuff here

1
我遇到了一个错误:“Celery”对象没有“decorators”属性。你有什么想法吗?我在我的任务上面写了@celery.decorators.periodic_task(run_every=datetime.timedelta(minutes=5))。 - Always_a_learner
最新版本的celery没有这个装饰器。你必须使用这里的说明:http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html - Conley Owens

13

你只需要在celery配置中指定想要定期运行的任务和时间间隔。

例如:每30秒运行tasks.add任务。

from datetime import timedelta

CELERYBEAT_SCHEDULE = {
    "runs-every-30-seconds": {
        "task": "tasks.add",
        "schedule": timedelta(seconds=30),
        "args": (16, 16)
     },
}

记得要使用 -B 选项在 beat 模式下运行 celery。

manage celeryd -B

除了时间间隔,您还可以使用crontab风格的方式,方法如下:

http://ask.github.com/celery/userguide/periodic-tasks.html

如果您正在使用django-celery,请记住您也可以使用django数据库作为周期性任务的调度程序。这样一来,您就可以通过django-celery管理面板轻松添加新的周期性任务。 要这样做,您需要在settings.py中设置celerybeat调度程序,方法如下:

CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"

2
这样做的问题在于它不会等待任务完成,而是在时间到了之后(每30秒)就会发送另一个任务。或者我错了吗? - Julian Popov
谢谢您的建议,但我想要另外一种方式——我想创建一个作业,将其发送以执行,并且仅在前一个作业的执行完成后才创建另一个作业。在我确定前一个作业已完成之前,我不想创建新的作业。我希望任务具有同步(而非异步)行为。 - Julian Popov
全局目标是运行一个任务,我无法确定它需要多长时间完成,当它完成后等待一段时间再重新开始。同时,我必须确保它不会被不同的工作线程同时执行2次或更多次,并且我不必编写自己的程序代码来实现这一点。 - Julian Popov
2
如果你想确保一个任务只在上一个任务结束后才开始,可以使用memcached(或django cache)在该任务的任务类型或资源上创建锁。这很容易且可扩展。 - michael
@MauroRocco 这是不正确的,至少在3.0.12版本中,celery beat肯定会创建重叠任务。 - Alex B

4

为了进一步解释@MauroRocco的帖子,从http://docs.celeryproject.org/en/v2.2.4/userguide/periodic-tasks.html开始

使用时间差来安排任务意味着任务将在celerybeat启动后30秒执行,然后在上次运行后每30秒执行一次。也存在类似于crontab的计划表,详见Crontab计划表部分。

因此,这确实可以实现您想要的目标。


抱歉,关于这个问题,如果任务需要20秒才能完成,它会在0:30(第一次)运行,然后在0:50完成并在1:20开始运行(这正是我想要的)。 - Julian Popov
2
如果您希望任务每30秒独立于持续时间运行,则必须使用crontab计划,但请记住,这些任务会添加到celery队列中,如果有其他任务正在执行/在队列中,则不能确定您的任务是否按给定时间开始。 - Mauro Rocco

2
由于 celery.decorators 已经弃用,您可以使用 periodic_task 装饰器来实现定时任务,示例如下:
from celery.task.base import periodic_task
from django.utils.timezone import timedelta

@periodic_task(run_every=timedelta(seconds=5))
def my_background_process():
    # insert code

0
将该任务添加到一个单独的队列中,然后使用一个并发选项设置为1的独立工作程序来处理该队列。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接