如何在FastAPI中关闭循环?

6

我有一个路由/,它启动了一个无限循环(从技术上讲,直到WebSocket断开连接,但在这个简化的示例中它是真正无限的)。 如何在关闭时停止这个循环:

from fastapi import FastAPI

import asyncio

app = FastAPI()
running = True

@app.on_event("shutdown")
def shutdown_event():
    global running
    running = False

@app.get("/")
async def index():
    while running:
        await asyncio.sleep(0.1)

根据文档@app.on_event("shutdown")应该在关闭时被调用,但我怀疑它被调用的方式类似于生命周期事件,在所有操作完成后才被调用,这在这种情况下会导致死锁。
测试方法如下:
  1. 运行命令:uvicorn module.filename:app --host 0.0.0.0
  2. 使用curl访问http://ip:port/
  3. 停止服务器(按下CTRL+C
你会发现它永远挂起,因为shutdown_event没有被调用,所以running从未被设置为false。(是的,你可以通过按下CTRL+C来强制关闭。)

好的,我在问题中添加了更多信息,是的,我通过 CTRL+C 停止它。 - Sir l33tname
我并不过分关注程序上的强制关闭,我更喜欢一种干净而有序的关闭方式,但如果在第一个“CTRL+C”之后它崩溃了整个程序,那也比不停止好。 - Sir l33tname
@chris 我的意思是,如果你使用sys.exit(),那么会存在“问题”,但是Github上的解决方案是合作的,只是确保无限循环在关闭时停止,这正是我想要的。我不明白这如何会影响其他请求,如果你测试示例,你会发现客户端获得的是null而不是错误。 - Sir l33tname
仅因running被设置为False且没有调用sys.exit(),否则客户端将收到内部服务器错误响应而返回null。你问如何在关闭时停止循环。那么,使用该方法,当按下CTRL+C时,你的应用程序是否实际上已关闭?我猜没有,因为控制台中的应用程序仍在运行。因此,你需要调用sys.exit(),如果像之前描述的那样这样做,应用程序将被强制退出,而不会先完成已经运行的事件循环中的任务。 - Chris
啊,我明白了,这个解决方案适合我,我可以协作地停止循环,但如果睡眠时间更长,这将是一个问题,因为它不会被中止,但在 Github 问题描述的解决方案解决了我的具体问题。 - Sir l33tname
显示剩余5条评论
3个回答

1
import signal
import asyncio
from fastapi import FastAPI

app = FastAPI()
running = True

def stop_server(*args):
    global running
    running = False

@app.on_event("startup")
def startup_event():
    signal.signal(signal.SIGINT, stop_server)

@app.get("/")
async def index():
    while running:
        await asyncio.sleep(0.1)

来源: https://github.com/tiangolo/fastapi/discussions/9373#discussioncomment-5573492

设置和捕获 SIGINT 信号可以捕获第一个 CNTR+C。 这将把 running 设置为 False,从而结束 index() 中的循环。 终止正在运行的请求,允许服务器关闭。


0

我以为这是一件简单的事情,但事实并非如此 :-) 我认为这甚至值得在FastAPI上提出“预关机”事件的功能请求,因为如果嵌入代码中,它可能很简单。

所以,当运行时,uvicorn会向事件循环注册一个回调以在请求退出时执行。当调用一次时,它将更改uvicorn Server对象中的状态(它将server.should_exit属性设置为True)。因此,如果您有一种清洁的方法来获取服务器实例正在运行,则可以在长时间查看中轮询该属性以查看是否应该退出。我找不到任何方法来获取正在运行的服务器的引用。

所以,我决定注册另一个信号处理程序:您可以在应用程序中拥有该处理程序以根据需要更改状态。但问题在于:asyncio每个信号只能有一个处理程序,当一个人注册处理程序时,前一个处理程序就消失了。这意味着安装自定义处理程序将删除uvicorn的处理程序,并且它根本不会关闭。

为了解决这个问题,我必须检查运行中的asyncio循环中的loop._signal_handlers:这被认为是私有的,但通过这样做,我可以在自定义信号处理程序后链接原始信号处理程序的调用。

长话短说,这段代码可以在第一次按下“ctrl + C”时退出服务器:
from fastapi import FastAPI, Request
import asyncio

from uvicorn.server import HANDLED_SIGNALS
from functools import partial

app = FastAPI()
running = True

#@app.on_event("shutdown")
#def shutdown_event():
    #global running
    #running = False

@app.get("/")
async def index(request: Request):
    while running:
        await asyncio.sleep(0.1)

@app.on_event("startup")
def chain_signals():
    loop = asyncio.get_running_loop()
    loop = asyncio.get_running_loop()
    signal_handlers = getattr(loop, "_signal_handlers", {})  # disclaimer 1: this is a private attribute: might change without notice.
                                                            # Also: unix only, won't work on windows
    for sig in HANDLED_SIGNALS:
        loop.add_signal_handler(sig, partial(handle_exit, signal_handlers.get(sig, None))  , sig, None)

def handle_exit(original_handler, sig, frame):
    global running
    running = False
    if original_handler:
        return original_handler._run()   # disclaimer 2: this should be opaque and performed only by the running loop. 
                                         # not so bad: this is not changing, and is safe to do. 



我想强调的是,我能够得到这个工作的片段,是因为你提供了一个最小化的可工作示例来说明你的问题。你会惊讶于有多少问题的作者没有这样做。

我担心这不是一个跨平台的解决方案,应该在回答的描述中明确说明(而不是在代码注释中)。正如Python文档中给出的示例所述,使用loop.add_signal_handler()为信号注册处理程序仅适用于Unix。 - Chris
1
我创建了一个问题,因为拥有不需要内部解决方案会很不错 :) https://github.com/tiangolo/fastapi/discussions/9373,我测试了你的解决方案,对我来说它终止了循环,但似乎它实际上从未关闭服务器,直到我手动杀死该进程。 - Sir l33tname
对于Windows系统,可以忽略asyncio的add_signal_handler并直接使用signal.signal添加处理程序。这也是FastAPI内部所做的。 - jsbueno
在Windows上,使用signal.signal()函数定义自定义处理程序需要在其中运行sys.exit()以终止应用程序;否则,应用程序将继续运行(按下CTRL+C后)。这实际上是运行(在Windows上)最近由提问者发布的示例时发生的情况。 OP提到,在Fedora上,应用程序退出;尽管如此,该示例并未提供优雅的关闭,而是提供了硬关闭(即强制应用程序退出),这意味着在退出之前不会给任何挂起的任务(例如请求和后台任务)完成的时间。 - Chris
这就是为什么需要添加一个async处理程序的原因,循环可以被停止并给待处理任务完成的时间 - 这是我过去几天一直在努力解决的问题。顺便说一下,signal.signal()不是FastAPI内部使用的,而是Uvicorn使用的(请参见此处)。 - Chris
快速API安装的处理程序将负责不退出,然后在第二次尝试时退出。 它们只需在用户安装的处理程序之后链接即可,就像我为循环绑定的处理程序示例中所做的那样。然而,这不是一本书的章节或生产代码,OP似乎没有使用Windows:如果认为需要一个工作的Windows示例,请随意添加。 (无论如何,我也不知道如何断言它是否有效) - jsbueno

0
如@jsbueno和其他人指出的,安装第二个信号处理程序是有问题的。下面是一个完整的模板程序,它可以在uvicorn/FastAPI生态系统中运行,并且可以通过调用URL(/shutdown或/restart)或发送信号(例如SIGINT)来关闭。你需要将其保存为"main.py"才能使其工作。
根据程序的关闭方式返回不同的退出代码(对于/shutdown或/restart分别为0或1)。我在一个包装程序中使用这些返回代码来重新启动服务器,如果这是被请求的操作。
from fastapi import FastAPI
import uvicorn
import time
import asyncio
import signal
import sys
import os


class RuntimeVals:
    shutdown = False
    restart = False
    shutdown_complete = False


runtime_cfg = RuntimeVals()
app = FastAPI()


async def worker(n):
    while not runtime_cfg.shutdown:
        await asyncio.sleep(0.1)
    if n == 1:
        raise RuntimeError("This is a demo error in worker 1")
    else:
        print(f"Worker {n} shutdown cleanly")

async def mainloop():
    loop = asyncio.get_running_loop()
    done = []
    pending = [loop.create_task(worker(1)), loop.create_task(worker(2))]

    # Handle results in the order the task are completed
    # if exeption you can handle that as well.
    while len(pending) > 0:
        done, pending = await asyncio.wait(pending)
        for task in done:
            e = task.exception()
            if e is not None:
                # This will print the exception as stack trace
                task.print_stack()
            else:
                result = task.result()
    # This is needed to kill the Uvicorn server and communicate the
    # exit code
    if runtime_cfg.restart:
        print("RESTART")
    else:
        print("SHUTDOWN")
    runtime_cfg.shutdown_complete = True
    os.kill(os.getpid(), signal.SIGINT)


@app.get("/shutdown")
async def clean_shutdown():
    runtime_cfg.shutdown = True


@app.get("/restart")
async def clean_restart():
    runtime_cfg.restart = True
    runtime_cfg.shutdown = True

@app.on_event("startup")
async def startup_event():
    loop = asyncio.get_running_loop()
    loop.create_task(mainloop())


@app.on_event("shutdown")
async def shutdown_event():
    # This is a hook point where the event
    # loop has completely shut down
    runtime_cfg.shutdown = True
    while runtime_cfg.shutdown_complete is False:
        print("waiting")
        await asyncio.sleep(1)


if __name__ == "__main__":
    uv_cfg = uvicorn.Config(
        "main:app",
        host="0.0.0.0",
        port=8000,
        log_level="debug",
        timeout_graceful_shutdown=2,
    )
    server = uvicorn.Server(config=uv_cfg)
    server.run()
    import main
    if main.runtime_cfg.restart:
        sys.exit(1)
    else:
        sys.exit(0)

大部分的操作都发生在mainloop中,它是从startup_event中调用的。Mainloop创建子任务并监视它们,根据需要收集和打印异常。每个子任务在runtime_cfg.shutdown为True时应返回。
关闭程序有两种方式:通过信号或通过URL(/shutdown或/restart)。如果使用SIGTERM/SIGINT信号,shutdown_event将设置runtime_cfg.shutdown = True然后等待。每个工作进程退出;一旦所有工作进程都停止,mainloop设置runtime_cfg.shutdown_complete = True,这允许shutdown_event返回并使uvicorn退出。mainloop发送的第二个信号被忽略,因为uvicorn服务器已经在关闭过程中。
如果调用了URL,它将设置runtime_cfg.shutdown = True。每个工作进程退出,mainloop向程序发送SIGINT信号,这会导致ucivorn启动关闭过程。
一个非常棘手的问题是在程序结束时,当server.run()返回时。在主函数中设置的任何模块变量(例如runtime_cfg.restart),在服务器运行时被遮蔽。我相当确定这是因为uvicorn在启动时再次导入主函数(它在uvicorn.Config中使用"main:app")。这种遮蔽使得从服务器环境向主函数传递信息变得非常混乱。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接