Django - 每隔x秒运行一个函数

41
我正在开发一个Django应用程序。我有一个API端点,如果请求该端点,必须执行一个需要重复几次的函数(直到满足某个条件为止)。目前我的处理方式是 -
def shut_down(request):
  # Do some stuff
  while True:
    result = some_fn()
    if result:
      break
    time.sleep(2)

  return True

虽然我知道这是一种可怕的方法,我不应该阻塞2秒钟,但我无法想出其他解决方案。等待4秒后,这种方法可以运行。但我希望有一些东西能够在后台保持循环运行,并在some_fn返回True时停止。(此外,some_fn肯定会返回True)

编辑 -
阅读Oz123的回答给了我一个想法,似乎可以解决问题。这是我所做的 -
def shut_down(params):
    # Do some stuff
    # Offload the blocking job to a new thread

    t = threading.Thread(target=some_fn, args=(id, ), kwargs={})
    t.setDaemon(True)
    t.start()

    return True

def some_fn(id):
    while True:
        # Do the job, get result in res
        # If the job is done, return. Or sleep the thread for 2 seconds before trying again.

        if res:
            return
        else:
            time.sleep(2)

这对我来说很有效。它很简单,但我不知道多线程与Django结合使用的效率如何。
如果有人能指出其中的缺点,那么欢迎批评。

1
你可以使用celery来实现。您可以在这里找到手册:https://realpython.com/blog/python/asynchronous-tasks-with-django-and-celery/#periodic-tasks - neverwalkaloner
@neverwalkaloner 听起来正是我需要的。我会尝试使用它,谢谢。 :) - Zeokav
1
正如@neverwalkaloner所提到的,您可以使用celery,您可以设置定期任务计划,查看文档非常灵活http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html,请查看crontab模块。 - julian salas
5个回答

53

对于许多小项目来说,celery 太过复杂。对于这些项目,您可以使用 schedule,它非常易于使用。

使用此库,您可以使任何函数定期执行任务:

import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)

while True:
    schedule.run_pending()
    time.sleep(1) 

这个示例以阻塞方式运行,但如果您查看常见问题解答,您会发现您也可以在并行线程中运行任务,从而不会阻塞,并在不再需要任务时删除它:

import threading    
import time 

from schedule import Scheduler

def run_continuously(self, interval=1):
    """Continuously run, while executing pending jobs at each elapsed
    time interval.
    @return cease_continuous_run: threading.Event which can be set to
    cease continuous run.
    Please note that it is *intended behavior that run_continuously()
    does not run missed jobs*. For example, if you've registered a job
    that should run every minute and you set a continuous run interval
    of one hour then your job won't be run 60 times at each interval but
    only once.
    """

    cease_continuous_run = threading.Event()

    class ScheduleThread(threading.Thread):

        @classmethod
        def run(cls):
            while not cease_continuous_run.is_set():
                self.run_pending()
                time.sleep(interval)

    continuous_thread = ScheduleThread()
    continuous_thread.setDaemon(True)
    continuous_thread.start()
    return cease_continuous_run


Scheduler.run_continuously = run_continuously

这是一个类方法使用的示例:
    def foo(self):
        ...
        if some_condition():
           return schedule.CancelJob  # a job can dequeue it

    # can be put in __enter__ or __init__
    self._job_stop = self.scheduler.run_continuously()

    logger.debug("doing foo"...)
    self.foo() # call foo
    self.scheduler.every(5).seconds.do(
        self.foo) # schedule foo for running every 5 seconds
    
    ...
    # later on foo is not needed any more:
    self._job_stop.set()
    
    ...
    
    def __exit__(self, exec_type, exc_value, traceback):
        # if the jobs are not stop, you can stop them
        self._job_stop.set()
    

2
事实上,我正在寻找避免使用Celery的方法(暂时),因为我在项目中只需要在一两个地方进行调度。我刚刚偶然发现了这样的东西哈哈!我会尝试实现它,并在有任何问题时回复。谢谢您的回复! :) - Zeokav
我在下面发布了一个答案。如果您能告诉我那是否可能会在以后引起问题,我将不胜感激。 - Zeokav
有趣的约定 - 你为什么不创建一个继承自 Scheduler 的类,添加 run_continuously 方法呢?这样做会使正在进行的操作更加清晰明了。 - Tim
这个例子涉及到类,所以它变得更具挑战性。 - oz123
13
你好,我不理解这个……我该如何在我的 Django 项目中实现它? - Mohamed Benkedadra
显示剩余4条评论

25
这篇文章在Oz123的答案的基础上进行了一些扩展。
为了使事情正常工作,我创建了一个名为mainapp/jobs.py的文件来包含我的定时任务。然后,在我的apps.py模块中,我在ready方法中放置了from . import jobs。以下是我整个apps.py文件:
from django.apps import AppConfig
import os

class MainappConfig(AppConfig):
    name = 'mainapp'

    def ready(self):
        from . import jobs

        if os.environ.get('RUN_MAIN', None) != 'true':
            jobs.start_scheduler()

(检查 RUN_MAIN 是因为 python manage.py runserver 在两个进程中分别运行 ready 方法,但我们只想运行一次。)

现在,这是我在 jobs.py 文件中添加的内容。首先是导入。你需要像下面这样导入 SchedulerthreadingtimeFUserHolding 导入仅用于我的作业,你不需要导入这些。

from django.db.models import F
from schedule import Scheduler
import threading
import time

from .models import UserHolding

接下来,编写您想要调度的函数。以下仅是一个示例;您的函数看起来不会像这样。

def give_admin_gold():
    admin_gold_holding = (UserHolding.objects
        .filter(inventory__user__username='admin', commodity__name='gold'))

    admin_gold_holding.update(amount=F('amount') + 1)

接下来,通过向Scheduler类添加run_continuously方法,使用下面的代码对schedule模块进行猴子补丁。该代码完全复制自Oz123的答案。请注意保留HTML标签。
def run_continuously(self, interval=1):
    """Continuously run, while executing pending jobs at each elapsed
    time interval.
    @return cease_continuous_run: threading.Event which can be set to
    cease continuous run.
    Please note that it is *intended behavior that run_continuously()
    does not run missed jobs*. For example, if you've registered a job
    that should run every minute and you set a continuous run interval
    of one hour then your job won't be run 60 times at each interval but
    only once.
    """

    cease_continuous_run = threading.Event()

    class ScheduleThread(threading.Thread):

        @classmethod
        def run(cls):
            while not cease_continuous_run.is_set():
                self.run_pending()
                time.sleep(interval)

    continuous_thread = ScheduleThread()
    continuous_thread.setDaemon(True)
    continuous_thread.start()
    return cease_continuous_run

Scheduler.run_continuously = run_continuously

最后,定义一个函数来创建一个Scheduler对象,将你的任务与之关联,并调用调度程序的run_continuously方法。

def start_scheduler():
    scheduler = Scheduler()
    scheduler.every().second.do(give_admin_gold)
    scheduler.run_continuously()

非常感谢 @tanner-reinstate-lgbt-people,使得这个内容更加适合初学者。 - user1835157

3
我建议您使用Celery的任务管理。您可以参考this来设置此应用程序(如果您来自JavaScript背景,则为)。
设置完成后,您可以更改代码为:
@app.task
def check_shut_down():
    if not some_fun():
        # add task that'll run again after 2 secs
        check_shut_down.delay((), countdown=3)
    else:
        # task completed; do something to notify yourself
        return True

3

我无法就oz123(https://dev59.com/dVcP5IYBdhLWcg3wFmgw#44897678)和Tanner Swett(https://dev59.com/dVcP5IYBdhLWcg3wFmgw#60244694)的优秀帖子发表评论,但最后我想补充一点,如果你使用Gunicorn并且你有X个worker,那么这段代码:

from django.apps import AppConfig
import os

class MainappConfig(AppConfig):
    name = 'mainapp'

    def ready(self):
        from . import jobs

        if os.environ.get('RUN_MAIN', None) != 'true':
            jobs.start_scheduler()

将会执行相同次数的操作,同时启动X个调度程序。

如果我们只想运行一个实例(例如,如果您要在数据库中创建对象),我们需要在gunicorn.conf.py文件中添加类似以下内容:

def on_starting(server):
    from app_project import jobs
    jobs.start_scheduler()

最后,在gunicorn调用中添加参数--preload。


0
这是我的解决方案,附有源代码。此函数将允许您创建一个调度程序,您可以随时启动并添加或删除作业。check_interval变量允许您在系统资源和作业执行时间之间进行权衡。
from schedule import Scheduler
import threading
import warnings
import time


class RepeatTimer(threading.Timer):
    """Add repeated run of target to timer functionality. Source: https://dev59.com/zGcs5IYBdhLWcg3w-Yr4#48741004"""
    running: bool = False

    def __init__(self, *args, **kwargs):
        threading.Timer.__init__(self, *args, **kwargs)

    def start(self) -> None:
        """Protect from running start method multiple times"""
        if not self.running:
            super(RepeatTimer, self).start()
            self.running = True
        else:
            warnings.warn('Timer is already running, cannot be started again.')

    def cancel(self) -> None:
        """Protect from running stop method multiple times"""
        if self.running:
            super(RepeatTimer, self).cancel()
            self.running = False
        else:
            warnings.warn('Timer is already canceled, cannot be canceled again.')

    def run(self):
        """Replace run method of timer to run continuously"""
        while not self.finished.wait(self.interval):
            self.function(*self.args, **self.kwargs)


class ThreadedScheduler(Scheduler, RepeatTimer):
    """Non-blocking scheduler. Advice taken from: https://dev59.com/22kw5IYBdhLWcg3w6Oly#50465583"""
    def __init__(
            self,
            run_pending_interval: float,
    ):
        """Initialize parent classes"""
        Scheduler.__init__(self)
        super(RepeatTimer, self).__init__(
            interval=run_pending_interval,
            function=self.run_pending,
        )


def print_work(what_to_say: str):
    print(what_to_say)


if __name__ == '__main__':
    my_schedule = ThreadedScheduler(run_pending_interval=1)
    job1 = my_schedule.every(1).seconds.do(print_work, what_to_say='Did_job1')
    job2 = my_schedule.every(2).seconds.do(print_work, what_to_say='Did_job2')
    my_schedule.cancel()
    my_schedule.start()
    time.sleep(7)
    my_schedule.cancel_job(job1)
    my_schedule.start()
    time.sleep(7)
    my_schedule.cancel()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接