FastAPI 在运行 API 调用时以串行方式而非并行方式运行。

40

我有以下代码:

import time
from fastapi import FastAPI, Request
    
app = FastAPI()
    
@app.get("/ping")
async def ping(request: Request):
        print("Hello")
        time.sleep(5)
        print("bye")
        return {"ping": "pong!"}

如果我在同一个浏览器窗口的不同标签页内运行我的代码(例如: http://localhost:8501/ping),我会得到以下结果:

Hello
bye
Hello
bye

改为:

Hello
Hello
bye
bye

我已经阅读了有关使用 httpx 的内容,但仍然无法实现真正的并行化。问题出在哪里?


采纳的答案是否对您有所帮助?我仍然遇到与您提出问题相同的行为。使用单个工作线程时,所有请求(同步或异步)始终按顺序运行,而不是并行运行。 - kevinarpe
说实话,我从来没有设置过 uvicorn 使用的工作进程数... 可能是这个原因吗?在没有定义的情况下,对我来说它们是并行工作而不是异步的。至少在 fastapi=0.85.0 版本中是这样的。 - Learning from masters
Chrome至少会阻止在相同的URL上进行并发GET请求(可能是为了有机会在下一次使用缓存版本?)在隐身模式下使用一个Chrome进行测试,同时可以使用“def”和“async def”。 - Robert Verdes
简而言之,将time.sleep()替换为await asyncio.sleep(),你就能实现并发。 - undefined
2个回答

98
根据FastAPI的文档所述:
当您使用普通的def而不是async def声明路径操作函数时,它会在一个外部线程池中运行,然后进行await,而不是直接调用(这样会阻塞服务器)。
此外,如此处所描述:
如果您正在使用与某些内容(数据库、API、文件系统等)进行通信的第三方库,并且该库不支持使用await(这是目前大多数数据库库的情况),那么请像平常一样声明您的路径操作函数,只需使用def
如果您的应用程序(以某种方式)不需要与其他任何内容进行通信并等待其响应,请使用async def
如果您不确定,就使用普通的def
注意:您可以在路径操作函数中混合使用defasync def,根据您的需要定义每个函数。FastAPI将会正确处理它们。
无论以上哪种情况,FastAPI仍然能够异步工作并且非常快速。
但是通过遵循上述步骤,它将能够进行一些性能优化。
因此,在FastAPI中,使用def定义的端点(在异步编程的上下文中,只使用def定义的函数称为同步函数)在一个单独的线程中运行,该线程来自外部线程池,然后进行await操作,因此,FastAPI仍然可以以异步方式工作。换句话说,服务器将同时处理对这些端点的请求。而async def定义的端点在事件循环中运行-在主(单个)线程上-也就是说,服务器还将同时/异步地处理对这些端点的请求,只要在这些async def端点/路由内部有对非阻塞I/O绑定操作的await调用,例如等待(1)从客户端通过网络发送的数据,(2)从磁盘读取文件的内容,(3)数据库操作完成等(请参见这里)。然而,如果使用async def定义的端点内部没有await操作,以便放弃时间让事件循环中的其他任务运行(例如,对相同或其他端点的请求,后台任务等),则必须完全完成对该端点的每个请求(即退出端点),然后将控制权返回给事件循环并允许其他任务运行。换句话说,在这种情况下,服务器将按顺序处理请求。请注意,相同的概念不仅适用于FastAPI端点,还适用于StreamingResponse的生成器函数(请参见StreamingResponse类实现),以及Background Tasks(请参见BackgroundTask类实现);因此,在阅读完本答案后,您应该能够决定是否应该使用defasync def定义FastAPI端点、StreamingResponse的生成器或后台任务函数。
关键字await(仅在async def函数内部有效)将函数控制权传递回事件循环。换句话说,它暂停了周围协程的执行(即调用async def函数的结果是一个协程对象),并告诉事件循环让其他任务运行,直到该await的任务完成。请注意,仅仅因为您可以使用async def定义自定义函数,并在async def端点中await它,并不意味着您的代码将异步工作,如果该自定义函数包含例如对time.sleep()、CPU密集型任务、非异步I/O库或任何与异步Python代码不兼容的阻塞调用。例如,在FastAPI中,当使用UploadFileasync方法时,如await file.read()await file.write(),FastAPI/Starlette在后台实际上会在外部线程池中运行这些文件对象的方法(使用async run_in_threadpool()函数)并await它;否则,这些方法/操作将阻塞事件循环。您可以通过查看UploadFile类的实现来了解更多信息。
请注意async并不意味着并行,而是并发。使用asyncawait的异步代码通常被总结为使用协程。协程是合作式(或协同式多任务)的,这意味着“在任何给定时间,一个拥有协程的程序只运行其中之一,并且该运行中的协程仅在显式请求挂起时才挂起执行”(有关协程的更多信息,请参见这里这里)。如本文所述
具体来说,每当当前正在运行的协程执行到一个await表达式时,该协程可能会被挂起,如果它所挂起的操作已经返回了一个值,那么之前被挂起的另一个协程可能会恢复执行。当异步迭代器的async for块请求下一个值或者async with块进入或退出时,也会发生挂起操作,因为这些操作在内部使用了await
然而,如果在async def函数/端点中直接执行/调用了阻塞的I/O操作或CPU密集型操作,它将会阻塞主线程(以及事件循环)。因此,在async def端点中使用像time.sleep()这样的阻塞操作将会阻塞整个服务器(就像你问题中提供的代码示例一样)。因此,如果你的端点不打算进行任何async调用,你可以使用def声明它,这样它将在外部线程池中运行,然后再进行await,如前面所述(在以下章节中还提供了更多解决方案)。示例:
@app.get("/ping")
def ping(request: Request):
    #print(request.client)
    print("Hello")
    time.sleep(5)
    print("bye")
    return "pong"

否则,如果您在端点内执行的函数是async函数,并且需要使用await来等待结果,那么您应该使用async def来定义您的端点。为了演示这一点,下面的示例使用了asyncio.sleep()函数(来自asyncio库),它提供了一个非阻塞的睡眠操作。await asyncio.sleep()方法将暂停周围协程的执行(直到睡眠操作完成),从而允许事件循环中的其他任务运行。类似的示例也可以在这里这里找到。
import asyncio
 
@app.get("/ping")
async def ping(request: Request):
    #print(request.client)
    print("Hello")
    await asyncio.sleep(5)
    print("bye")
    return "pong"

无论是上述哪个端点,都会按照你的问题中提到的顺序将指定的消息打印到屏幕上——如果两个请求几乎同时到达的话。
Hello
Hello
bye
bye

重要提示

当您第二次(第三次等)调用终端时,请记得从与浏览器主会话隔离的标签页进行操作;否则,后续请求(即在第一个请求之后发出的请求)将被浏览器(在客户端)阻止,因为浏览器在发送下一个请求之前将等待来自服务器的前一个请求的响应。您可以通过在终端内使用print(request.client)来确认这一点,在那里您将看到所有传入请求的hostnameport号都相同(如果请求是从同一浏览器窗口/会话中打开的标签页发起的),因此,这些请求将按顺序处理,因为浏览器首先按顺序发送它们。为了解决这个问题,您可以选择以下方法之一:

  1. 重新加载相同的标签页(正在运行的),或
  2. 在隐身窗口中打开一个新的标签页,或
  3. 使用不同的浏览器/客户端发送请求,或
  4. 使用httpx库进行异步HTTP请求, 以及可等待asyncio.gather(), 它允许同时执行多个异步操作,然后按照传递给该函数的可等待对象(任务)的相同顺序返回结果列表(请参考此答案了解更多详情)。
  5. 示例

    import httpx
    import asyncio
    
    URLS = ['http://127.0.0.1:8000/ping'] * 2
    
    async def send(url, client):
        return await client.get(url, timeout=10)
    
    async def main():
        async with httpx.AsyncClient() as client:
            tasks = [send(url, client) for url in URLS]
            responses = await asyncio.gather(*tasks)
            print(*[r.json() for r in responses], sep='\n')
    
    asyncio.run(main())
    

    如果您需要调用不同的端点,这些端点可能需要不同的时间来处理请求,并且您希望在服务器返回响应时立即在客户端上打印出响应,而不是等待asyncio.gather()收集所有任务的结果并按照传递给send()函数的顺序打印出来,您可以将上面示例中的send()函数替换为下面显示的函数:

    async def send(url, client):
        res = await client.get(url, timeout=10)
        print(res.json())
        return res
    

异步/等待和阻塞的I/O密集或CPU密集操作

如果你需要使用async def(因为你可能需要在端点内await协程),但同时又有一些同步的阻塞的I/O密集或CPU密集操作(长时间运行的计算任务)会阻塞事件循环(实质上是整个服务器),并且不允许其他请求通过,例如:

@app.post("/ping")
async def ping(file: UploadFile = File(...)):
    print("Hello")
    try:
        contents = await file.read()
        res = cpu_bound_task(contents)  # this will block the event loop
    finally:
        await file.close()
    print("bye")
    return "pong"

然后:
  1. You should check whether you could change your endpoint's definition to normal def instead of async def. For example, if the only method in your endpoint that has to be awaited is the one reading the file contents (as you mentioned in the comments section below), you could instead declare the type of the endpoint's parameter as bytes (i.e., file: bytes = File()) and thus, FastAPI would read the file for you and you would receive the contents as bytes. Hence, there would be no need to use await file.read(). Please note that the above approach should work for small files, as the enitre file contents would be stored into memory (see the documentation on File Parameters); and hence, if your system does not have enough RAM available to accommodate the accumulated data (if, for example, you have 8GB of RAM, you can’t load a 50GB file), your application may end up crashing. Alternatively, you could call the .read() method of the SpooledTemporaryFile directly (which can be accessed through the .file attribute of the UploadFile object), so that again you don't have to await the .read() method—and as you can now declare your endpoint with normal def, each request will run in a separate thread (example is given below). For more details on how to upload a File, as well how Starlette/FastAPI uses SpooledTemporaryFile behind the scenes, please have a look at this answer and this answer.

    @app.post("/ping")
    def ping(file: UploadFile = File(...)):
        print("Hello")
        try:
            contents = file.file.read()
            res = cpu_bound_task(contents)
        finally:
            file.file.close()
        print("bye")
        return "pong"
    
  2. Use FastAPI's (Starlette's) run_in_threadpool() function from the concurrency module—as @tiangolo suggested here—which "will run the function in a separate thread to ensure that the main thread (where coroutines are run) does not get blocked" (see here). As described by @tiangolo here, "run_in_threadpool is an awaitable function, the first parameter is a normal function, the next parameters are passed to that function directly. It supports both sequence arguments and keyword arguments".

    from fastapi.concurrency import run_in_threadpool
    
    res = await run_in_threadpool(cpu_bound_task, contents)
    
  3. Alternatively, use asyncio's loop.run_in_executor()—after obtaining the running event loop using asyncio.get_running_loop()—to run the task, which, in this case, you can await for it to complete and return the result(s), before moving on to the next line of code. Passing None as the executor argument, the default executor will be used; that is ThreadPoolExecutor:

    import asyncio
    
    loop = asyncio.get_running_loop()
    res = await loop.run_in_executor(None, cpu_bound_task, contents)
    

    or, if you would like to pass keyword arguments instead, you could use a lambda expression (e.g., lambda: cpu_bound_task(some_arg=contents)), or, preferably, functools.partial(), which is specifically recommended in the documentation for loop.run_in_executor():

    import asyncio
    from functools import partial
    
    loop = asyncio.get_running_loop()
    res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))
    

    You could also run your task in a custom ThreadPoolExecutor. For instance:

    import asyncio
    import concurrent.futures
    
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        res = await loop.run_in_executor(pool, cpu_bound_task, contents)
    

    In Python 3.9+, you could also use asyncio.to_thread() to asynchronously run a synchronous function in a separate thread—which, essentially, uses await loop.run_in_executor(None, func_call) under the hood, as can been seen in the implementation of asyncio.to_thread(). The to_thread() function takes the name of a blocking function to execute, as well as any arguments (*args and/or **kwargs) to the function, and then returns a coroutine that can be awaited. Example:

    import asyncio
    
    res = await asyncio.to_thread(cpu_bound_task, contents)
    
  4. ThreadPoolExecutor will successfully prevent the event loop from being blocked, but won't give you the performance improvement you would expect from running code in parallel; especially, when one needs to perform CPU-bound operations, such as the ones described here (e.g., audio or image processing, machine learning, and so on). It is thus preferable to run CPU-bound tasks in a separate process—using ProcessPoolExecutor, as shown below—which, again, you can integrate with asyncio, in order to await it to finish its work and return the result(s). As described here, on Windows, it is important to protect the main loop of code to avoid recursive spawning of subprocesses, etc. Basically, your code must be under if __name__ == '__main__':.

    import concurrent.futures
    
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        res = await loop.run_in_executor(pool, cpu_bound_task, contents) 
    
  5. Use more workers. For example, uvicorn main:app --workers 4 (if you are using Gunicorn as a process manager with Uvicorn workers, please have a look at this answer). Note: Each worker "has its own things, variables and memory". This means that global variables/objects, etc., won't be shared across the processes/workers. In this case, you should consider using a database storage, or Key-Value stores (Caches), as described here and here. Additionally, note that "if you are consuming a large amount of memory in your code, each process will consume an equivalent amount of memory".

  6. If you need to perform heavy background computation and you don't necessarily need it to be run by the same process (for example, you don't need to share memory, variables, etc), you might benefit from using other bigger tools like Celery, as described in FastAPI's documentation.


1
计算任务意味着CPU密集型负载。在CPython中,由于GIL只允许一个线程处于活动状态,因此线程对于CPU任务没有明显的提升。因此,在这里,既不会使用“def”路由,也不会使用“run_in_threadpool”。 - zhanymkanov
1
@zhanymkanov 感谢您的评论。我知道 Python 的 GIL,因此,我计划很快扩展上面的答案,使用 multiprocessing 提供更多解决方案。上述提到的选项 1(即增加 workers 的数量)已经是解决此问题的一种方法。无论如何,在外部线程池中运行这样的任务,然后等待它们完成,而不是直接调用它们 - 尽管不能真正实现并行处理 - 也比没有好,因为这样的任务否则会阻塞整个服务器。 - Chris
1
@bravmi 不客气。上面的相关部分已经更新,希望现在更清晰了。如需更多细节,请查看上面提供的链接。 - Chris
@Chris 只是为了明确起见。如果我使用更多的 N 个工作线程,那么异步路由将会有 N 个事件循环,这可能会被阻塞,对吗?另一个问题是,ThreadPoolExecutor 在多个工作线程之间共享,还是每个工作线程都有自己的 ThreadPoolExecutor 用于非异步路由?感谢澄清! - undefined
1
很遗憾在fastAPI官方文档中找不到这样一个很棒的答案,文档并没有清楚地描述这些内容。这将为很多人节省了很多时间。 - undefined
显示剩余7条评论

0
问题:
" ... 有什么问题吗? " 答案:
FastAPI文档明确表示该框架使用进程内任务(继承自Starlette)。这意味着所有这样的任务都竞争接收(不时)Python解释器GIL锁 - 这是一个有效地使所有量的Python解释器进程内线程
工作为一个-并且-只有一个工作-而其他所有人都在等待的MUTEX恐怖化全局解释器锁。
在细粒度上,您会看到结果——如果为第二个(手动从第二个FireFox选项卡启动的)到达的http请求生成另一个处理程序实际上比睡眠需要更长的时间,则GIL锁交错〜100[ms]时间量子轮询(所有等待一次可以工作〜100[ms]在每次GIL锁释放-获取轮盘之前进行下一轮)Python解释器内部工作不显示更多详细信息,您可以使用更多详细信息(取决于操作系统类型或版本)从here查看更多in-thread LoD,例如在执行异步装饰代码内部的情况如下:
import time
import threading
from   fastapi import FastAPI, Request

TEMPLATE = "INF[{0:_>20d}]: t_id( {1: >20d} ):: {2:}"

print( TEMPLATE.format( time.perf_counter_ns(),
                        threading.get_ident(),
                       "Python Interpreter __main__ was started ..."
                        )
...
@app.get("/ping")
async def ping( request: Request ):
        """                                __doc__
        [DOC-ME]
        ping( Request ):  a mock-up AS-IS function to yield
                          a CLI/GUI self-evidence of the order-of-execution
        RETURNS:          a JSON-alike decorated dict

        [TEST-ME]         ...
        """
        print( TEMPLATE.format( time.perf_counter_ns(),
                                threading.get_ident(),
                               "Hello..."
                                )
        #------------------------------------------------- actual blocking work
        time.sleep( 5 )
        #------------------------------------------------- actual blocking work
        print( TEMPLATE.format( time.perf_counter_ns(),
                                threading.get_ident(),
                               "...bye"
                                )
        return { "ping": "pong!" }

最后但并非最不重要的,不要犹豫阅读更多关于所有其他鲨鱼线程代码可能遭受的问题...或者幕后甚至导致的问题...

广告备忘录

一种混合了GIL锁、基于线程池、异步装饰器、阻塞和事件处理的组合——一定会带来不确定性和HWY2HELL(通往地狱之路);o)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接