如何在Flask上每小时运行一次函数的计划?

157

我有一个Flask网站托管在没有访问cron命令的环境下。

我该如何每小时执行某些Python函数?

10个回答

170

您可以使用来自APScheduler包(v3.5.3)的BackgroundScheduler()

import time
import atexit

from apscheduler.schedulers.background import BackgroundScheduler


def print_date_time():
    print(time.strftime("%A, %d. %B %Y %I:%M:%S %p"))


scheduler = BackgroundScheduler()
scheduler.add_job(func=print_date_time, trigger="interval", seconds=60)
scheduler.start()

# Shut down the scheduler when exiting the app
atexit.register(lambda: scheduler.shutdown())

请注意,当Flask处于调试模式时,这两个调度程序将会被启动。如需更多信息,请查看问题。


1
@user5547025 请问调度器是如何工作的?如果我已经将内容放在schedule.py中,它会如何自动运行? - Kishan Mehta
2
我认为用户5547025建议的计划适用于同步任务,可能会阻塞主线程。您需要启动一个工作线程,以避免它被阻塞。 - Simon
1
如果flask有一个App.runonceApp.runForNseconds,你可以在schedule和flask运行器之间切换,但事实并非如此,所以现在唯一的方法是使用这个。 - lurscher
2
@Dewsworld 为什么在最后一行使用 lambda? 为什么不直接使用 atexit.register(scheduler.shutdown),而不是 atexit.register(lambda: scheduler.shutdown()) - Andrei Prãdan
APScheduler最新版本(v.3.8.1)的一个缺点是它依赖于[backports.zoneinfo, pytz-deprecation-shim],这使得在带有Python 3.8的ARM环境中使用变得更加困难。 - Altair7852
显示剩余4条评论

116

我对应用程序调度器的概念还有点陌生,但我在APScheduler v3.3.1这里找到了一些不太一样的东西。我相信对于最新版本,软件包结构、类名称等都已经发生了变化,因此我在这里提供一个最近制作的新解决方案,与基本的Flask应用程序集成:

#!/usr/bin/python3
""" Demonstrating Flask, using APScheduler. """

from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask

def sensor():
    """ Function for test purposes. """
    print("Scheduler is alive!")

sched = BackgroundScheduler(daemon=True)
sched.add_job(sensor,'interval',minutes=60)
sched.start()

app = Flask(__name__)

@app.route("/home")
def home():
    """ Function for test purposes. """
    return "Welcome Home :) !"

if __name__ == "__main__":
    app.run()

如果有人对这个示例的更新感兴趣,我也在这里留下了Gist

以下是一些参考资料,供日后阅读:


3
很好,希望随着更多人看到这个帖子,它能够得到更高的投票排名。 - Mwspencer
1
你尝试过在像PythonAnywhere之类的Web应用程序上使用它吗? - Mwspencer
1
谢谢,@Mwspencer。是的,我已经使用过了,它运行得很好:),虽然我建议您探索apscheduler.schedulers.background提供的更多选项,因为您的应用程序可能会遇到其他有用的场景。问候。 - ivanleoncz
2
当应用程序退出时,请不要忘记关闭调度程序。 - Hanynowsky
2
你好!你能给一些关于有多个gunicorn工作进程的情况下的建议吗?我的意思是,调度程序会每个工作进程执行一次吗? - ElPapi42
显示剩余4条评论

70
你可以在Flask应用程序中利用APScheduler并通过其界面运行你的任务:

你可以在Flask应用程序中利用APScheduler,通过其接口运行你的任务:

import atexit

# v2.x version - see https://dev59.com/h2Ei5IYBdhLWcg3wfsfI#38501429
# for the 3.x version
from apscheduler.scheduler import Scheduler
from flask import Flask

app = Flask(__name__)

cron = Scheduler(daemon=True)
# Explicitly kick off the background thread
cron.start()

@cron.interval_schedule(hours=1)
def job_function():
    # Do your work here


# Shutdown your cron thread if the web process is stopped
atexit.register(lambda: cron.shutdown(wait=False))

if __name__ == '__main__':
    app.run()

1
我可以问一个初学者的问题吗?为什么在atexit.register中有lambda - Pygmalion
3
因为atexit.register需要调用一个函数。如果我们只是传递cron.shutdown(wait=False),那么我们将会传递调用cron.shutdown的结果(可能是None)。因此,我们传递一个零参数函数,而不是给它命名并在语句def shutdown(): cron.shutdown(wait=False)atexit.register(shutdown)中使用。相反,我们使用lambda:在内联中注册它(这是一个零参数函数表达式)。 - Sean Vieira
谢谢。所以问题是我们想要向函数传递参数,如果我理解正确的话。 - Pygmalion

22

我尝试使用Flask替代简单的apscheduler,需要安装的是

pip3 install flask_apscheduler

以下是我的代码示例:

from flask import Flask
from flask_apscheduler import APScheduler

app = Flask(__name__)
scheduler = APScheduler()

def scheduleTask():
    print("This test runs every 3 seconds")

if __name__ == '__main__':
    scheduler.add_job(id = 'Scheduled Task', func=scheduleTask, trigger="interval", seconds=3)
    scheduler.start()
    app.run(host="0.0.0.0")

19

对于一个简单的解决方案,您可以添加一个例如以下路由:

@app.route("/cron/do_the_thing", methods=['POST'])
def do_the_thing():
    logging.info("Did the thing")
    return "OK", 200

然后添加一个Unix cron工作,定期地向这个端点发送POST请求。例如,要每分钟运行一次,请在终端中键入crontab -e并添加这行:

* * * * * /opt/local/bin/curl -X POST https://YOUR_APP/cron/do_the_thing
< p >请注意,curl的路径必须完整,因为当作业运行时,它将没有您的PATH。您可以通过< code> which curl 找出系统上curl的完整路径。< /em>< /p>

我喜欢这个方法,因为易于手动测试,没有额外的依赖项,并且由于没有什么特别的事情发生,因此易于理解。

安全性

如果要对cron job进行密码保护,可以通过< code> pip install Flask-BasicAuth 进行安装,并将凭据添加到应用程序配置中:

app = Flask(__name__)
app.config['BASIC_AUTH_REALM'] = 'realm'
app.config['BASIC_AUTH_USERNAME'] = 'falken'
app.config['BASIC_AUTH_PASSWORD'] = 'joshua'

为了对作业端点进行密码保护:

from flask_basicauth import BasicAuth
basic_auth = BasicAuth(app)

@app.route("/cron/do_the_thing", methods=['POST'])
@basic_auth.required
def do_the_thing():
    logging.info("Did the thing a bit more securely")
    return "OK", 200

然后从您的 cron job 中调用它:

* * * * * /opt/local/bin/curl -X POST https://falken:joshua@YOUR_APP/cron/do_the_thing

这种方法的问题在于很难记录应用程序是否有定时组件。话虽如此,我以前做过这个。 - Sid Kwakkel
包括像“此函数从cron调用”这样的注释是有帮助的,但我知道你的意思。我有一些代码,我不会很快地回想起它是如何(以及是否)被定期执行的。 - Bemmu

15

你可以尝试使用APScheduler的BackgroundScheduler将间隔任务集成到你的Flask应用程序中。以下是使用蓝图和应用工厂(init.py)的示例:

from datetime import datetime

# import BackgroundScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from flask import Flask

from webapp.models.main import db 
from webapp.controllers.main import main_blueprint    

# define the job
def hello_job():
    print('Hello Job! The time is: %s' % datetime.now())

def create_app(object_name):
    app = Flask(__name__)
    app.config.from_object(object_name)
    db.init_app(app)
    app.register_blueprint(main_blueprint)
    # init BackgroundScheduler job
    scheduler = BackgroundScheduler()
    # in your case you could change seconds to hours
    scheduler.add_job(hello_job, trigger='interval', seconds=3)
    scheduler.start()

    try:
        # To keep the main thread alive
        return app
    except:
        # shutdown if app occurs except 
        scheduler.shutdown()

希望能对你有所帮助 :)

参考链接:

  1. https://github.com/agronholm/apscheduler/blob/master/examples/schedulers/background.py

2
我相信return语句永远不会引发异常。 - Tamas Hegedus

6

另一个选择可能是使用 Flask-APScheduler,它与 Flask 配合得很好,例如:

  • 从 Flask 配置中加载调度程序配置,
  • 从 Flask 配置中加载作业定义

更多信息请参见:

https://pypi.python.org/pypi/Flask-APScheduler


5

您可以使用 flask-crontab 模块,这非常容易。

步骤 1: pip install flask-crontab

步骤 2:

from flask import Flask
from flask_crontab import Crontab

app = Flask(__name__)
crontab = Crontab(app)

步骤三:

@crontab.job(minute="0", hour="6", day="*", month="*", day_of_week="*")
def my_scheduled_job():
    do_something()

第四步:在命令提示符上,输入
flask crontab add

完成后,只需运行您的Flask应用程序,然后您可以检查您的函数将在每天6:00调用。

您可以参考此处(官方文档)。


不适用于Windows。出现错误:“没有名为'fcntl'的模块”,而且“fctnl模块在Windows系统上不可用”。 - leenremm

4

使用schedule和multiprocessing的完整示例,带有开关控制和参数运行run_job()。返回代码已简化,间隔设置为10秒,更改为every(2).hour.do()以进行2小时计划。Schedule非常出色,它不会漂移,我从未见过它在安排时超过100ms。使用multiprocessing而不是线程,因为它具有终止方法。

#!/usr/bin/env python3

import schedule
import time
import datetime
import uuid

from flask import Flask, request
from multiprocessing import Process

app = Flask(__name__)
t = None
job_timer = None

def run_job(id):
    """ sample job with parameter """
    global job_timer
    print("timer job id={}".format(id))
    print("timer: {:.4f}sec".format(time.time() - job_timer))
    job_timer = time.time()

def run_schedule():
    """ infinite loop for schedule """
    global job_timer
    job_timer = time.time()
    while 1:
        schedule.run_pending()
        time.sleep(1)

@app.route('/timer/<string:status>')
def mytimer(status, nsec=10):
    global t, job_timer
    if status=='on' and not t:
        schedule.every(nsec).seconds.do(run_job, str(uuid.uuid4()))
        t = Process(target=run_schedule)
        t.start()
        return "timer on with interval:{}sec\n".format(nsec)
    elif status=='off' and t:
        if t:
            t.terminate()
            t = None
            schedule.clear()
        return "timer off\n"
    return "timer status not changed\n"

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

您可以通过执行以下命令进行测试:

$ curl http://127.0.0.1:5000/timer/on
timer on with interval:10sec
$ curl http://127.0.0.1:5000/timer/on
timer status not changed
$ curl http://127.0.0.1:5000/timer/off
timer off
$ curl http://127.0.0.1:5000/timer/off
timer status not changed

每隔10秒钟计时器都会向控制台发出一个计时器消息:

127.0.0.1 - - [18/Sep/2018 21:20:14] "GET /timer/on HTTP/1.1" 200 -
timer job id=b64ed165-911f-4b47-beed-0d023ead0a33
timer: 10.0117sec
timer job id=b64ed165-911f-4b47-beed-0d023ead0a33
timer: 10.0102sec

我不是多进程方面的专家,但如果你使用它,很可能会出现pickle错误。 - Patrick Mutuku
@PatrickMutuku,我看到数字序列化(cookies、临时文件)的唯一问题是异步和websockets,但是Flask不是你的api,请看https://github.com/kennethreitz/responder。Flask在Apache WSGI上通过简单的前端优秀地支持纯REST。 - MortenB

1
你可能希望使用一些队列机制和调度器,例如 RQ scheduler 或者更加复杂的 Celery(很可能过于复杂)。

这在您想要按请求或按用户帐户运行作业时非常有用,但对于琐碎的系统级任务,您始终可以使用 APScheduler 或仅设置 Cron 作业(假设您不是将 Docker 用于后者,因为这往往变得复杂,尝试运行进程管理器并设置记录器)。 - OzzyTheGiant

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接