Flask,逐个处理请求

18

我有一个 Flask 应用程序,它监听一些任务。这个过程非常漫长(比如说1分钟),我不想同时处理两个请求。

如果收到一个请求后,能够关闭 Flask 监听的端口,并在完成后重新打开它,那就太好了。或者我可以设置一个信号量,但是我不确定 Flask 是否会并发运行。

有什么建议吗?

from flask import Flask, request
app = Flask(__name__)

@app.route("/",methods=['GET'])
def say_hi():
    return "get not allowed"

@app.route("/",methods=['POST'])
def main_process():
    # heavy process here to run alone
    return "Done"

if __name__ == "__main__":
    app.run(debug=True,host='0.0.0.0')

你打算如何运行Flask?是直接通过Flask还是将其作为WSGI模块运行? - Georg Schölly
我正在使用wsgi模块。 - mosh442
1
在这种情况下可能会更加复杂。一个WSGI服务器(取决于配置)可以并行生成多个进程,但是Python的锁只能在线程之间工作,而不能跨进程工作。您需要引入一个共享资源,您可以锁定该资源。这可以是数据库、文件或共享锁,例如命名信号量。 - Georg Schölly
我看到这是一个老问题,但是我通过引入一个负载均衡器来解决了这个问题,该负载均衡器将请求重定向到“未繁忙”的服务器。 - Kots
1
根据我阅读的https://docs.python.org/3/library/multiprocessing.html#synchronization-primitives,使用`multiprocessing`库中的`Lock`对象似乎是在WSGI生成的所有线程和/或进程中获得锁的正确方法。 - chrisinmtown
3个回答

15

你可以使用信号量来实现:

import threading
import time
sem = threading.Semaphore()

@app.route("/",methods=['POST'])
def main_process():
    sem.acquire()
    # heavy process here to run alone
    sem.release()
    return "Done"

信号量的使用是用于控制对共享资源的访问。

您可以在此处查看有关信号量的更多信息。

这个 SO 问题也可以帮助您,点击此处

编辑:

正如 Georg Schölly 在评论中所写,上述解决方案在多个服务的情况下存在问题。

尽管如此,您可以使用 WSGI 来实现您的目标。

@app.route("/",methods=['POST'])
def main_process():
    uwsgi.lock()
    # Critical section
    # heavy process here to run alone
    uwsgi.unlock()
    return "Done"

uWSGI支持可配置数量的锁,您可以使用这些锁来同步工作进程。

了解更多信息,请阅读此处


3
如果存在多个进程,则此方法无效。这里的锁定在多个进程间不起作用,而这通常是Web服务器的常规配置。 - Georg Schölly
3
@GeorgSchölly,感谢您的评论。我已添加编辑部分以涉及到您的评论。 - omri_saadon
只是为了完全理解,当您说多个进程时,您是指运行多个Python脚本并使锁定在所有脚本之间工作吗?而Semaphore和Lock不能做到这一点? - Rahim Khoja

-2
你可以尝试添加一个threading.Lock来表示有些工作已经在进行中:
import threading
from contextlib import ExitStack

busy = threading.Lock()
@app.route("/",methods=['POST'])
def main_process():
    if not busy.acquire(timeout = 1):
        return 'The application is busy, refresh the page in a few minutes'

    # ensure busy.release() is called even if an exception is thrown
    with ExitStack() as stack:
        stack.callback(busy.release)
        # heavy process here to run alone

    return "Done"

但是 Flask 默认只允许一次请求被处理(更多信息在这里),所以如果您可以接受在处理单个请求期间,所有其他用户的页面都无法加载直到进程完成(甚至可能会出现请求超时错误)的事实,那么您不需要改变任何东西。
如果您想让其他用户得到消息,就像上面的代码一样,将工作人员数量增加到2,这样当一个工作人员处理请求时,另一个工作人员会阻止其他人。


没有锁定,这很容易出现竞态条件。 - Georg Schölly
1
如果有多个进程,则此方法无效。此处的锁定不适用于跨多个进程,而这是Web服务器的正常配置。 - Georg Schölly
@GeorgSchölly,你可以传递一个threaded选项告诉服务器它应该在线程模式下工作。 - illright
1
没错,但这假设应用程序通过Flask集成的服务器运行。特别是在生产模式下,这种情况很少见。Mosh在他的问题评论中指出,他通过WSGI使用它。 - Georg Schölly

-2
我测试了一下,结果和预期一样。我建议小心使用,因为这可能会锁定服务器上的线程并使其超负荷运行。经过重新考虑,我决定在我的环境中不需要它。
def single_threaded_endpoint(f):
    lock = Lock()

    @wraps(f)
    def decorated_function(*args, **kwargs):
        with lock:
            return f(*args, **kwargs)

    return decorated_function

@single_threaded_endpoint
def stripe_notification():
    ...

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接