使用后台线程的Flask应用程序

43

我正在创建一个Flask应用程序,对于某个请求,我需要运行一些不需要等待UI的长时间运行的作业。我将创建一个线程并向UI发送消息。 线程将计算并更新数据库。 但是,提交后UI将看到一条消息。 以下是我的实现,但它先运行线程,然后发送输出到UI,这不是我想要的方式。 我该如何在后台运行此线程?

@app.route('/someJob')
def index():
    t1 = threading.Thread(target=long_running_job)
    t1.start()
    return 'Scheduled a job'

def long_running_job
    #some long running processing here

如何使线程t1在后台运行并立即返回消息?

6个回答

104

请尝试此示例,经过 Python 3.4.3 / Flask 0.11.1 测试。

from flask import Flask
from time import sleep
from concurrent.futures import ThreadPoolExecutor

# DOCS https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
executor = ThreadPoolExecutor(2)

app = Flask(__name__)


@app.route('/jobs')
def run_jobs():
    executor.submit(some_long_task1)
    executor.submit(some_long_task2, 'hello', 123)
    return 'Two jobs were launched in background!'


def some_long_task1():
    print("Task #1 started!")
    sleep(10)
    print("Task #1 is done!")


def some_long_task2(arg1, arg2):
    print("Task #2 started with args: %s %s!" % (arg1, arg2))
    sleep(5)
    print("Task #2 is done!")


if __name__ == '__main__':
    app.run()

1
你能否在除了本地主机之外的任何Web服务器上运行它,比如gunicorn或apache-wsgi? - arshpreet
3
我几乎有相同的代码在生产环境中运行,从2016年开始通过uWSGI运行到现在,没有任何问题。使用Gunicorn进行测试也很好。 - Denys Synashko
以下是项目真实代码的相关行 - https://gist.github.com/itnow/af696c41d9e61a4aea5baf0e1098e305 - Denys Synashko
2
如果你想在Flask中使用concurrent.futures,请查看Flask-Executor。它提供了一种更符合惯例的在Flask中初始化执行器的方式,并提供了一些方便的功能(保留应用程序上下文,应用程序工厂模式支持,装饰器)。 - daveruinseverything
2
如果 some_long_task1some_long_task2 引发异常会怎么样?我写了类似的代码,但发现异常被抑制了。 - secsilm
显示剩余13条评论

27

看看 Flask-Executor,它在后台使用concurrent.futures使您的生活变得非常容易。

from flask_executor import Executor

executor = Executor(app)

@app.route('/someJob')
def index():
    executor.submit(long_running_job)
    return 'Scheduled a job'

def long_running_job
    #some long running processing here

这不仅可以在后台运行作业,还可以让它们访问应用程序上下文。它还提供了一种存储作业的方式,以便用户可以返回并获取状态。


它必须放在应用程序上下文中吗? - TomSawyer
这个非常好用,我已经使用这个执行器完成了一些示例 https://stackoverflow.com/questions/68411571/flask-socketio-emitting-a-pandas-dataframe-from-a-background-task-using-flask。但是,您能否指出一个可以同时运行多个后台任务的示例? - Sade

16

处理类似问题最好使用消息代理。在Python世界中有一些出色的软件专门用于处理这种情况:

两者都是非常不错的选择。

通过像您现在所做的方式生成线程通常不是一个好主意,因为这可能会导致处理传入请求时出现问题等等。

如果您查看 celery 或 RQ 入门指南,它们将向您详细介绍正确的操作方法!


7
是的,我知道关于celery和redis队列的事情。但是,我正在尝试将它们用作简单的线程后台作业。如果我使用线程,不确定会出现什么问题。你能解释一下吗?此外,如果我想以我编码的方式工作,需要做哪些更改?这完全不可能吗? - San
我对了解这如何影响请求处理很感兴趣。 - Konrad
@San,我想这是因为Python使用GIL,而Python的线程并不是传统意义上的线程。即使是多线程应用程序,在Python中仍然是单线程的。Python中的线程主要有助于IO操作,但除此之外的任何操作实际上都可能影响请求处理。而且Python中的线程并不轻量级,因此它们会占用大量资源。 - winwin

1
如果您想在Flask应用程序上下文中执行长时间运行的操作,那么(与使用ThreadPoolExecutor并处理异常相比)这会更容易一些:
  1. 为您的应用程序定义一个命令行(cli.py)-因为所有Web应用程序都应该有一个管理cli
  2. 在Web请求中使用subprocess.Popen(不等待)打开命令行。
例如:
# cli.py

import click
import yourpackage.app
import yourpackage.domain

app = yourpackage.app.create_app()

@click.group()
def cli():
    pass

@click.command()
@click.argument('foo_id')
def do_something(foo_id):
    with app.app_context():
        yourpackage.domain.do_something(foo_id)

if __name__ == '__main__':
    cli.add_command(do_something)
    cli()

然后,
# admin.py (flask view / controller)

bp = Blueprint('admin', __name__, url_prefix='/admin')

@bp.route('/do-something/<int:foo_id>', methods=["POST"])
@roles_required('admin')
def do_something(foo_id):
    yourpackage.domain.process_wrapper_do_something(foo_id)
    flash("Something has started.", "info")
    return redirect(url_for("..."))

并且:

# domain.py

import subprocess

def process_wrapper_do_something(foo_id):
    command = ["python3", "-m", "yourpackage.cli", "do_something", str(foo_id)]
    subprocess.Popen(command)

def do_something(foo_id):
    print("I am doing something.")
    print("This takes some time.")

0

同意@rdegges标记的答案。很抱歉我的账户没有足够的信用在答案下添加评论,但我想明确“为什么使用消息代理而不是生成线程(或进程)”。

关于ThreadPoolExecutor和flask_executor的其他答案正在创建一个新的线程(或进程,因为flask_executor能够执行)来执行“long_running_job”。这些新的线程/进程将具有与主网站相同的上下文:

对于线程:新线程将能够访问网站应用程序的上下文,更改内容或破坏它,如果此线程引发异常; 对于进程:新进程将拥有网站应用程序上下文的副本。如果网站在初始化时某种方式使用了大量内存,则新进程也将拥有其副本,即使该进程不打算利用该内存部分。

另一方面,如果您使用消息代理和另一个应用程序检索作业消息以处理它,则新应用程序将与网站应用程序无关,也不会复制Web应用程序中的内存。

将来,当您的应用程序足够大时,可以将应用程序放置在另一个服务器(或多个服务器)中,轻松扩展。


0

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接