最小可重现示例:
import asyncio
import aiopg
from fastapi import FastAPI, WebSocket
dsn = "dbname=aiopg user=aiopg password=passwd host=127.0.0.1"
app = FastAPI()
class ConnectionManager:
self.count_connections = 0
# other class functions and variables are taken from FastAPI docs
...
manager = ConnectionManager()
async def send_and_receive_data(websocket: WebSocket):
data = await websocket.receive_json()
await websocket.send_text('Thanks for the message')
# then process received data
# taken from official aiopg documentation
# the function listens to PostgreSQL notifications
async def listen(conn):
async with conn.cursor() as cur:
await cur.execute("LISTEN channel")
while True:
msg = await conn.notifies.get()
async def postgres_listen():
async with aiopg.connect(dsn) as listenConn:
listener = listen(listenConn)
await listener
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.websocket("/")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
manager.count_connections += 1
if manager.count_connections == 1:
await asyncio.gather(
send_and_receive_data(websocket),
postgres_listen()
)
else:
await send_and_receive_data(websocket)
问题描述:
我正在使用Vue.js、FastAPI和PostgreSQL构建一个应用程序。在这个例子中,我尝试使用Postgres的listen/notify并将其实现在websocket中。我还使用了许多常规的http端点以及websocket端点。
我想在启动FastAPI应用程序时运行一个永久的后台异步函数,然后向所有websocket客户端/连接发送消息。因此,当我使用
uvicorn main:app
时,它不仅应该运行FastAPI应用程序,还应该运行我的后台函数postgres_listen()
,当数据库中添加新行时通知所有websocket用户。我知道我可以使用
asyncio.create_task()
并将其放置在on_*
事件中,甚至可以在manager = ConnectionManager()
行之后放置它,但在我的情况下它不起作用!因为在任何http请求之后(例如read_root()
函数),我都会得到下面描述的相同错误。您看到我在我的
websocket_endpoint()
函数中使用了一种奇怪的方式来运行我的postgres_listen()
函数,只有当第一个客户端连接到websocket时才会运行/触发此函数。任何后续的客户端连接都不会再次运行/触发此函数。一切工作正常...直到第一个客户端/用户断开连接(例如,关闭浏览器选项卡)。当它发生时,我立即收到由psycopg2.OperationalError
引起的GeneratorExit
错误。Future exception was never retrieved
future: <Future finished exception=OperationalError('Connection closed')>
psycopg2.OperationalError: Connection closed
Task was destroyed but it is pending!
task: <Task pending name='Task-18' coro=<Queue.get() done, defined at
/home/user/anaconda3/lib/python3.8/asyncio/queues.py:154> wait_for=<Future cancelled>>
错误来自于
listen()
函数。在此错误之后,我将不会收到任何来自数据库的通知,因为asyncio的Task
已被取消。 psycopg2
,aiopg
或asyncio
没有任何问题。问题在于我不知道该把postgres_listen()
函数放在哪里,以便在第一个客户端断开连接后不会被取消。据我的理解,我可以轻松编写一个Python脚本,连接到websocket(这样我就是websocket的第一个客户端),然后永远运行,这样我就不会再次遇到psycopg2.OperationalError
异常,但这似乎不太合适。
我的问题是:我应该把postgres_listen()
函数放在哪里,以便第一个连接到websocket的连接可以断开而不会产生后果?
P.S. asyncio.shield()
也不起作用。