一旦握手完成,协议从HTTP
更改为WebSocket
。如果你尝试在websocket端点内引发一个HTTP
异常,你会发现这是不可能的,或者返回一个HTTP
响应(例如,return JSONResponse(...status_code=404)
),你将得到一个内部服务器错误,即ASGI callable returned without sending handshake
。
选项1
因此,如果您想在协议升级之前拥有某种检查机制,则需要使用
中间件
,如下所示。在中间件内部,您不能引发异常,但可以返回响应(即
Response
、
JSONResponse
、
PlainTextResponse
等),这实际上是FastAPI在幕后
处理异常的方式。作为参考,请查看此
文章以及
此处的讨论。
async def is_user_allowed(request: Request):
print(request['headers'])
print(request.client)
return False
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
if not await is_user_allowed(request):
return JSONResponse(content={"message": "User not allowed"}, status_code=404)
response = await call_next(request)
return response
或者,如果您愿意,您可以使用is_user_allowed()
方法引发自定义异常,并需要使用try-except
块捕获:
class UserException(Exception):
def __init__(self, message):
self.message = message
super().__init__(message)
async def is_user_allowed(request: Request):
raise UserException(message="User not allowed.")
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
try:
await is_user_allowed(request)
except UserException as e:
return JSONResponse(content={"message": f'{e.message}'}, status_code=404)
response = await call_next(request)
return response
选项2
然而,如果你需要使用websocket
实例来完成这个任务,你可以采用与上面相同的逻辑,但是将websocket
实例传递给is_user_allowed()
方法,并在websocket终端内部捕获异常(灵感来自this)。
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
await ws.accept()
try:
await is_user_allowed(ws)
await handle_conn(ws)
except UserException as e:
await ws.send_text(e.message)
await ws.close()
在上述代码中,您必须首先接受连接,以便在出现异常时调用
close()
方法来终止连接。如果您愿意,可以使用以下内容。但是,在
except
块中的
return
语句将抛出内部服务器错误(即,
ASGI callable returned without sending handshake.
),如前所述。
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
try:
await is_user_allowed(ws)
except UserException as e:
return
await ws.accept()
await handle_conn(ws)
@app.middleware("http")
)。 - Chris