根据
FastAPI的文档所述:
当您使用普通的
def
而不是
async def
声明路径操作函数时,它会在一个外部线程池中运行,然后进行
await
,而不是直接调用(这样会阻塞服务器)。
此外,如
此处所描述:
如果您正在使用与某些内容(数据库、API、文件系统等)进行通信的第三方库,并且该库不支持使用
await
(这是目前大多数数据库库的情况),那么请像平常一样声明您的路径操作函数,只需使用
def
。
如果您的应用程序(以某种方式)不需要与其他任何内容进行通信并等待其响应,请使用
async def
。
如果您不确定,就使用普通的
def
。
注意:您可以在路径操作函数中混合使用
def
和
async def
,根据您的需要定义每个函数。FastAPI将会正确处理它们。
无论以上哪种情况,FastAPI仍然能够异步工作并且非常快速。
但是通过遵循上述步骤,它将能够进行一些性能优化。
因此,在FastAPI中,使用
def
定义的端点(在异步编程的上下文中,只使用
def
定义的函数称为同步函数)在一个单独的线程中运行,该线程来自外部线程池,然后进行
await
操作,因此,FastAPI仍然可以以异步方式工作。换句话说,服务器将同时处理对这些端点的请求。而
async def
定义的端点在
事件循环
中运行-在主(单个)线程上-也就是说,服务器还将同时/异步地处理对这些端点的请求,只要在这些
async def
端点/路由内部有对非阻塞I/O绑定操作的
await
调用,例如等待(1)从客户端通过网络发送的数据,(2)从磁盘读取文件的内容,(3)数据库操作完成等(请参见
这里)。然而,如果使用
async def
定义的端点内部没有
await
操作,以便放弃时间让事件循环中的其他任务运行(例如,对相同或其他端点的请求,后台任务等),则必须完全完成对该端点的每个请求(即退出端点),然后将控制权返回给事件循环并允许其他任务运行。换句话说,在这种情况下,服务器将按顺序处理请求。请注意,相同的概念不仅适用于FastAPI端点,还适用于
StreamingResponse
的生成器函数(请参见
StreamingResponse
类实现),以及
Background Tasks
(请参见
BackgroundTask
类实现);因此,在阅读完本答案后,您应该能够决定是否应该使用
def
或
async def
定义FastAPI端点、
StreamingResponse
的生成器或后台任务函数。
关键字
await
(仅在
async def
函数内部有效)将函数控制权传递回
事件循环
。换句话说,它暂停了周围
协程的执行(即调用
async def
函数的结果是一个协程对象),并告诉
事件循环
让其他任务运行,直到该
await
的任务完成。请注意,仅仅因为您可以使用
async def
定义自定义函数,并在
async def
端点中
await
它,并不意味着您的代码将异步工作,如果该自定义函数包含例如对
time.sleep()
、CPU密集型任务、非异步I/O库或任何与异步Python代码不兼容的阻塞调用。例如,在FastAPI中,当使用
UploadFile
的
async
方法时,如
await file.read()
和
await file.write()
,FastAPI/Starlette在后台实际上会在外部线程池中运行这些
文件对象的方法(使用
async
run_in_threadpool()
函数)并
await
它;否则,这些方法/操作将阻塞
事件循环
。您可以通过查看
UploadFile
类的实现来了解更多信息。
请注意
async
并不意味着
并行,而是
并发。使用
async
和
await
的异步代码通常被总结为使用协程。协程是合作式(或协同式多任务)的,这意味着“在任何给定时间,一个拥有协程的程序只运行
其中之一,并且该运行中的协程仅在显式请求挂起时才挂起执行”(有关协程的更多信息,请参见
这里和
这里)。如
本文所述:
具体来说,每当当前正在运行的协程执行到一个
await
表达式时,该协程可能会被挂起,如果它所挂起的操作已经返回了一个值,那么之前被挂起的另一个协程可能会恢复执行。当异步迭代器的
async for
块请求下一个值或者
async with
块进入或退出时,也会发生挂起操作,因为这些操作在内部使用了
await
。
然而,如果在
async def
函数/端点中直接执行/调用了阻塞的I/O操作或CPU密集型操作,它将会阻塞主线程(以及
事件循环
)。因此,在
async def
端点中使用像
time.sleep()
这样的阻塞操作将会阻塞整个服务器(就像你问题中提供的代码示例一样)。因此,如果你的端点不打算进行任何
async
调用,你可以使用
def
声明它,这样它将在外部线程池中运行,然后再进行
await
,如前面所述(在以下章节中还提供了更多解决方案)。示例:
@app.get("/ping")
def ping(request: Request):
print("Hello")
time.sleep(5)
print("bye")
return "pong"
否则,如果您在端点内执行的函数是
async
函数,并且需要使用
await
来等待结果,那么您应该使用
async def
来定义您的端点。为了演示这一点,下面的示例使用了
asyncio.sleep()
函数(来自
asyncio
库),它提供了一个非阻塞的睡眠操作。
await asyncio.sleep()
方法将暂停周围协程的执行(直到睡眠操作完成),从而允许事件循环中的其他任务运行。类似的示例也可以在
这里和
这里找到。
import asyncio
@app.get("/ping")
async def ping(request: Request):
print("Hello")
await asyncio.sleep(5)
print("bye")
return "pong"
无论是上述哪个端点,都会按照你的问题中提到的顺序将指定的消息打印到屏幕上——如果两个请求几乎同时到达的话。
Hello
Hello
bye
bye
重要提示
当您第二次(第三次等)调用终端时,请记得从与浏览器主会话隔离的标签页进行操作;否则,后续请求(即在第一个请求之后发出的请求)将被浏览器(在客户端)阻止,因为浏览器在发送下一个请求之前将等待来自服务器的前一个请求的响应。您可以通过在终端内使用print(request.client)
来确认这一点,在那里您将看到所有传入请求的hostname
和port
号都相同(如果请求是从同一浏览器窗口/会话中打开的标签页发起的),因此,这些请求将按顺序处理,因为浏览器首先按顺序发送它们。为了解决这个问题,您可以选择以下方法之一:
- 重新加载相同的标签页(正在运行的),或
- 在隐身窗口中打开一个新的标签页,或
- 使用不同的浏览器/客户端发送请求,或
- 使用
httpx
库进行异步HTTP请求, 以及可等待的asyncio.gather()
, 它允许同时执行多个异步操作,然后按照传递给该函数的可等待对象(任务)的相同顺序返回结果列表(请参考此答案了解更多详情)。
示例:
import httpx
import asyncio
URLS = ['http://127.0.0.1:8000/ping'] * 2
async def send(url, client):
return await client.get(url, timeout=10)
async def main():
async with httpx.AsyncClient() as client:
tasks = [send(url, client) for url in URLS]
responses = await asyncio.gather(*tasks)
print(*[r.json() for r in responses], sep='\n')
asyncio.run(main())
如果您需要调用不同的端点,这些端点可能需要不同的时间来处理请求,并且您希望在服务器返回响应时立即在客户端上打印出响应,而不是等待asyncio.gather()
收集所有任务的结果并按照传递给send()
函数的顺序打印出来,您可以将上面示例中的send()
函数替换为下面显示的函数:
async def send(url, client):
res = await client.get(url, timeout=10)
print(res.json())
return res
异步
/等待
和阻塞的I/O密集或CPU密集操作
如果你需要使用async def
(因为你可能需要在端点内await
协程),但同时又有一些同步的阻塞的I/O密集或CPU密集操作(长时间运行的计算任务)会阻塞事件循环
(实质上是整个服务器),并且不允许其他请求通过,例如:
@app.post("/ping")
async def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = await file.read()
res = cpu_bound_task(contents)
finally:
await file.close()
print("bye")
return "pong"
然后:
You should check whether you could change your endpoint's definition to normal def
instead of async def
. For example, if the only method in your endpoint that has to be awaited is the one reading the file contents (as you mentioned in the comments section below), you could instead declare the type of the endpoint's parameter as bytes
(i.e., file: bytes = File()
) and thus, FastAPI would read the file for you and you would receive the contents as bytes
. Hence, there would be no need to use await file.read()
. Please note that the above approach should work for small files, as the enitre file contents would be stored into memory (see the documentation on File
Parameters); and hence, if your system does not have enough RAM available to accommodate the accumulated data (if, for example, you have 8GB of RAM, you can’t load a 50GB file), your application may end up crashing. Alternatively, you could call the .read()
method of the SpooledTemporaryFile
directly (which can be accessed through the .file
attribute of the UploadFile
object), so that again you don't have to await
the .read()
method—and as you can now declare your endpoint with normal def
, each request will run in a separate thread (example is given below). For more details on how to upload a File
, as well how Starlette/FastAPI uses SpooledTemporaryFile
behind the scenes, please have a look at this answer and this answer.
@app.post("/ping")
def ping(file: UploadFile = File(...)):
print("Hello")
try:
contents = file.file.read()
res = cpu_bound_task(contents)
finally:
file.file.close()
print("bye")
return "pong"
Use FastAPI's (Starlette's) run_in_threadpool()
function from the concurrency
module—as @tiangolo suggested here—which "will run the function in a separate thread to ensure that the main thread (where coroutines are run) does not get blocked" (see here). As described by @tiangolo here, "run_in_threadpool
is an awaitable function, the first parameter is a normal function, the next parameters are passed to that function directly. It supports both sequence arguments and keyword arguments".
from fastapi.concurrency import run_in_threadpool
res = await run_in_threadpool(cpu_bound_task, contents)
Alternatively, use asyncio
's loop.run_in_executor()
—after obtaining the running event loop
using asyncio.get_running_loop()
—to run the task, which, in this case, you can await
for it to complete and return the result(s), before moving on to the next line of code. Passing None
as the executor argument, the default executor will be used; that is ThreadPoolExecutor
:
import asyncio
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, cpu_bound_task, contents)
or, if you would like to pass keyword arguments instead, you could use a lambda
expression (e.g., lambda: cpu_bound_task(some_arg=contents)
), or, preferably, functools.partial()
, which is specifically recommended in the documentation for loop.run_in_executor()
:
import asyncio
from functools import partial
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(None, partial(cpu_bound_task, some_arg=contents))
You could also run your task in a custom ThreadPoolExecutor
. For instance:
import asyncio
import concurrent.futures
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
res = await loop.run_in_executor(pool, cpu_bound_task, contents)
In Python 3.9+, you could also use asyncio.to_thread()
to asynchronously run a synchronous function in a separate thread—which, essentially, uses await loop.run_in_executor(None, func_call)
under the hood, as can been seen in the implementation of asyncio.to_thread()
. The to_thread()
function takes the name of a blocking function to execute, as well as any arguments (*args and/or **kwargs) to the function, and then returns a coroutine that can be await
ed. Example:
import asyncio
res = await asyncio.to_thread(cpu_bound_task, contents)
ThreadPoolExecutor
will successfully prevent the event loop
from being blocked, but won't give you the performance improvement you would expect from running code in parallel; especially, when one needs to perform CPU-bound
operations, such as the ones described here (e.g., audio or image processing, machine learning, and so on). It is thus preferable to run CPU-bound tasks in a separate process—using ProcessPoolExecutor
, as shown below—which, again, you can integrate with asyncio
, in order to await
it to finish its work and return the result(s). As described here, on Windows, it is important to protect the main loop of code to avoid recursive spawning of subprocesses, etc. Basically, your code must be under if __name__ == '__main__':
.
import concurrent.futures
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
res = await loop.run_in_executor(pool, cpu_bound_task, contents)
Use more workers. For example, uvicorn main:app --workers 4
(if you are using Gunicorn as a process manager with Uvicorn workers, please have a look at this answer). Note: Each worker "has its own things, variables and memory". This means that global
variables/objects, etc., won't be shared across the processes/workers. In this case, you should consider using a database storage, or Key-Value stores (Caches), as described here and here. Additionally, note that "if you are consuming a large amount of memory in your code, each process will consume an equivalent amount of memory".
If you need to perform heavy background computation and you don't necessarily need it to be run by the same process (for example, you don't need to share memory, variables, etc), you might benefit from using other bigger tools like Celery, as described in FastAPI's documentation.
time.sleep()
替换为await asyncio.sleep()
,你就能实现并发。 - undefined