如何使用Celery和Django编程生成Celerybeat条目

12

我希望能够以编程方式生成celerybeat条目并在添加条目时重新同步celerybeat。文档here中指出:

默认情况下,条目是从CELERYBEAT_SCHEDULE设置中获取的,但也可以使用自定义存储,例如将条目存储在SQL数据库中。

因此,我正在尝试弄清楚需要扩展哪些类才能实现此目的。

我一直在查看celery scheduler docsdjcelery api docs,但这些方法的一些文档都不存在,所以即将深入了解一些源代码,希望有人可以指点我正确的方向。

我想一个高层次的描述会有所帮助...作为用户,我需要能够从预定义的任务集中选择,并提供一种让用户选择某种自定义计划来执行它的方法,比如每天/每周/每月以及何时执行。

另外,这是在Django中使用djcelery。

更新

我看到了djcelery admin的代码,但不清楚数据是如何持久化的。我目前有一个通用的addTask视图,它看起来像这样:

def addTask(request):

intervalSchedule = IntervalSchedule.from_schedule(schedule(timedelta(seconds=10)))
intervalSchedule.save()
modelData = dict(
    name="dcTestPersist",
    task="technologytrackerapi.views.createRecord",
    schedule=intervalSchedule,
)
periodicTask = PeriodicTask(**modelData)
periodicTask.save()
return render_to_response('taskView.html')
数据在数据库中看起来是正确的,但当守护程序运行时,会出现以下错误:

[2012-03-06 00:23:07,926: WARNING/Beat] Process Beat:
[2012-03-06 00:23:07,926: WARNING/Beat] Traceback (most recent call last):
[2012-03-06 00:23:07,926: WARNING/Beat] File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
[2012-03-06 00:23:07,926: WARNING/Beat] self.run()
[2012-03-06 00:23:07,927: WARNING/Beat] File "/home/dchesterman/Documents/PythonDev/.virtualenvs/ros/local/lib/python2.7/site-packages/celery/beat.py", line 464, in run
[2012-03-06 00:23:07,927: WARNING/Beat] self.service.start(embedded_process=True)
[2012-03-06 00:23:07,927: WARNING/Beat] File "/home/dchesterman/Documents/PythonDev /.virtualenvs/ros/local/lib/python2.7/site-packages/celery/beat.py", line 403, in start
[2012-03-06 00:23:07,927: WARNING/Beat] interval = self.scheduler.tick()
[2012-03-06 00:23:07,927: WARNING/Beat] File "/home/dchesterman/Documents/PythonDev/.virtualenvs/ros/local/lib/python2.7/site-packages/celery/beat.py", line 194, in tick
[2012-03-06 00:23:07,927: WARNING/Beat] next_time_to_run = self.maybe_due(entry, self.publisher)
[2012-03-06 00:23:07,927: WARNING/Beat] File "/home/dchesterman/Documents/PythonDev/.virtualenvs/ros/local/lib/python2.7/site-packages/celery/beat.py", line 170, in maybe_due
[2012-03-06 00:23:07,927: WARNING/Beat] is_due, next_time_to_run = entry.is_due()
[2012-03-06 00:23:07,928: WARNING/Beat] File "/home/dchesterman/Documents/PythonDev/.virtualenvs/ros/local/lib/python2.7/site-packages/djcelery/schedulers.py", line 54, in is_due
[2012-03-06 00:23:07,928: WARNING/Beat] return self.schedule.is_due(self.last_run_at)
[2012-03-06 00:23:07,928: WARNING/Beat] AttributeError: 'NoneType' object has no attribute 'is_due'

我不确定为什么我的计划表没有使用默认的is_due()
2个回答

6
这是我最终采用的解决方案:
def addTask(request):

  intervalSchedule = IntervalSchedule.from_schedule(schedule(timedelta(seconds=10)))
  intervalSchedule.save()

  modelData = dict(
      name="dcTestPersist",
      task="technologytrackerapi.tasks.createRecord",
      interval_id=intervalSchedule.pk,
  )

  periodicTask = PeriodicTask(**modelData)
  periodicTask.save()

  me = ModelEntry(periodicTask)

  try:
      me.save()

  except:
    from django.db import connection
    print connection.queries
    raise

  return render_to_response('taskView.html')

我需要将定期任务封装在ModelEntry中。


因为某些我不理解的原因,我按照这个方式创建了我的定时任务,并且不需要用 ModelEntry 进行包装。不知道为什么或者 ModelEntry 是什么。随机记录 - 我也在设置 expires 并且感到困惑它没有效果。它确实有一个效果,但是作用于 celeryd 而不是 celerybeat - celerybeat 继续无限触发,但在 expires 之后 celeryd 忽略它(状态 revoked)。 - Chris
这是 djcelery 的一部分,它与 Django 集成。不确定您是否在使用它。 https://github.com/celery/django-celery 我会怀疑。看起来已经过时了。这是4年前的东西。 - Dustin
@ Dustin 抱歉,我并不是在询问,而是为其他可能随后到来的人做笔记。你的答案是在互联网上编程方式调度 CeleryBeat 的 PeriodicTask 的最简单示例!其余的使用配置文件或只给出部分答案。(我正在使用 djcelery - 我想那是数据库后端来自的地方,我 from djcelery.models import *.) - Chris
虽然抱歉,需要澄清一下,“PeriodicTask”在Celery和Djcelery中都存在,正如你所暗示的那样——我的使用了Djcelery版本。 - Chris

1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接