FastAPI:永久运行的后台任务,监听Postgres通知并将数据发送到WebSocket

3

最小可重现示例:

import asyncio
import aiopg
from fastapi import FastAPI, WebSocket


dsn = "dbname=aiopg user=aiopg password=passwd host=127.0.0.1"
app = FastAPI()


class ConnectionManager:
    self.count_connections = 0
    # other class functions and variables are taken from FastAPI docs
    ...


manager = ConnectionManager()


async def send_and_receive_data(websocket: WebSocket):
    data = await websocket.receive_json()
    await websocket.send_text('Thanks for the message')
    # then process received data


# taken from official aiopg documentation
# the function listens to PostgreSQL notifications
async def listen(conn):
    async with conn.cursor() as cur:
        await cur.execute("LISTEN channel")
        while True:
            msg = await conn.notifies.get()


async def postgres_listen():
    async with aiopg.connect(dsn) as listenConn:
        listener = listen(listenConn)
        await listener


@app.get("/")
def read_root():
    return {"Hello": "World"}


@app.websocket("/")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    manager.count_connections += 1

    if manager.count_connections == 1:
        await asyncio.gather(
            send_and_receive_data(websocket),
            postgres_listen()
        )
    else:
        await send_and_receive_data(websocket)

问题描述:
我正在使用Vue.js、FastAPI和PostgreSQL构建一个应用程序。在这个例子中,我尝试使用Postgres的listen/notify并将其实现在websocket中。我还使用了许多常规的http端点以及websocket端点。
我想在启动FastAPI应用程序时运行一个永久的后台异步函数,然后向所有websocket客户端/连接发送消息。因此,当我使用uvicorn main:app时,它不仅应该运行FastAPI应用程序,还应该运行我的后台函数postgres_listen(),当数据库中添加新行时通知所有websocket用户。
我知道我可以使用asyncio.create_task()并将其放置在on_*事件中,甚至可以在manager = ConnectionManager()行之后放置它,但在我的情况下它不起作用!因为在任何http请求之后(例如read_root()函数),我都会得到下面描述的相同错误。
您看到我在我的websocket_endpoint()函数中使用了一种奇怪的方式来运行我的postgres_listen()函数,只有当第一个客户端连接到websocket时才会运行/触发此函数。任何后续的客户端连接都不会再次运行/触发此函数。一切工作正常...直到第一个客户端/用户断开连接(例如,关闭浏览器选项卡)。当它发生时,我立即收到由psycopg2.OperationalError引起的GeneratorExit错误。
Future exception was never retrieved
future: <Future finished exception=OperationalError('Connection closed')>
psycopg2.OperationalError: Connection closed
Task was destroyed but it is pending!
task: <Task pending name='Task-18' coro=<Queue.get() done, defined at 
/home/user/anaconda3/lib/python3.8/asyncio/queues.py:154> wait_for=<Future cancelled>>

错误来自于listen()函数。在此错误之后,我将不会收到任何来自数据库的通知,因为asyncio的Task已被取消。 psycopg2aiopgasyncio没有任何问题。问题在于我不知道该把postgres_listen()函数放在哪里,以便在第一个客户端断开连接后不会被取消。据我的理解,我可以轻松编写一个Python脚本,连接到websocket(这样我就是websocket的第一个客户端),然后永远运行,这样我就不会再次遇到psycopg2.OperationalError异常,但这似乎不太合适。

我的问题是:我应该把postgres_listen()函数放在哪里,以便第一个连接到websocket的连接可以断开而不会产生后果?

P.S. asyncio.shield()也不起作用。

1个回答

3

我也在Github上回答了这个问题,这里重新发布一下。

可以在这里找到一个可用的示例: https://github.com/JarroVGIT/fastapi-github-issues/tree/master/5015

# app.py
import queue
from typing import Any
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from asyncio import Queue, Task
import asyncio

import uvicorn
import websockets

class Listener:
    def __init__(self):
        #Every incoming websocket conneciton adds it own Queue to this list called 
        #subscribers.
        self.subscribers: list[Queue] = []
        #This will hold a asyncio task which will receives messages and broadcasts them 
        #to all subscribers.
        self.listener_task: Task

    async def subscribe(self, q: Queue):
        #Every incoming websocket connection must create a Queue and subscribe itself to 
        #this class instance 
        self.subscribers.append(q)


    async def start_listening(self):
        #Method that must be called on startup of application to start the listening 
        #process of external messages.
        self.listener_task = asyncio.create_task(self._listener())

    async def _listener(self) -> None:
        #The method with the infinite listener. In this example, it listens to a websocket
        #as it was the fastest way for me to mimic the 'infinite generator' in issue 5015
        #but this can be anything. It is started (via start_listening()) on startup of app.
        async with websockets.connect("ws://localhost:8001") as websocket:
            async for message in websocket:
                for q in self.subscribers:
                    #important here: every websocket connection has its own Queue added to
                    #the list of subscribers. Here, we actually broadcast incoming messages
                    #to all open websocket connections.
                    await q.put(message)

    async def stop_listening(self):
        #closing off the asyncio task when stopping the app. This method is called on 
        #app shutdown
        if self.listener_task.done():
            self.listener_task.result()
        else:
            self.listener_task.cancel()

    async def receive_and_publish_message(self, msg: Any):
        #this was a method that was called when someone would make a request 
        #to /add_item endpoint as part of earlier solution to see if the msg would be 
        #broadcasted to all open websocket connections (it does)
        for q in self.subscribers:
            try:
                q.put_nowait(str(msg))
            except Exception as e:
                raise e

    #Note: missing here is any disconnect logic (e.g. removing the queue from the list of subscribers
    # when a websocket connection is ended or closed.)

        
global_listener = Listener()

app = FastAPI()

@app.on_event("startup")
async def startup_event():
    await global_listener.start_listening()
    return

@app.on_event("shutdown")
async def shutdown_event():
    await global_listener.stop_listening()
    return


@app.get('/add_item/{item}')
async def add_item(item: str):
    #this was a test endpoint, to see if new items where actually broadcasted to all 
    #open websocket connections.
    await global_listener.receive_and_publish_message(item)
    return {"published_message:": item}

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    q: asyncio.Queue = asyncio.Queue()
    await global_listener.subscribe(q=q)
    try:
        while True:
            data = await q.get()
            await websocket.send_text(data)
    except WebSocketDisconnect:
            return


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

由于我没有订阅消息流的权限,因此我创建了一个快速脚本生成websocket,以便app.py可以无限期地监听该流并模拟您的使用情况。

# generator.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
import uvicorn


app = FastAPI()

@app.websocket("/")
async def ws(websocket: WebSocket):
    await websocket.accept()
    i = 0
    while True:
        try:
            await websocket.send_text(f"Hello - {i}")
            await asyncio.sleep(2)
            i+=1
        except WebSocketDisconnect:
            pass

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8001)

app.py将监听一个websocket并将所有传入的消息发布到app.py中的websockets连接。

generator.py是一个简单的FastAPI应用程序,具有一个websocket(我们上面的示例app.py正在监听),该websocket每2秒向它接收到的每个连接发出一条消息。

要尝试这个:

  • 启动generator.py(例如,在工作文件夹中输入python3 generator.py)
  • 启动app.py(在VScode中调试模式或与上述相同)
  • 使用多个客户端侦听http://localhost:8000/ws(= app.py中的端点),您将看到它们都加入了相同的消息连续。

注意:许多此逻辑灵感来自Broadcaster(一个Python模块)。


有没有办法我可以向一个连接发送消息而不是所有连接? - rhaegnar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接