如何在Fastapi应用程序之外触发Fastapi Websocket的消息发送

3
我有一个像这样的WebSocket连接管理器:
class ConnectionManager:
    def __init__(self) -> None:
        self.connections  = {}
 
    async def connect(self, user_id: str, websocket: WebSocket):
        await websocket.accept()
        self.connections[user_id] = websocket

    async def disconnect(self, user_id):
        websocket: WebSocket = self.connections[user_id]
        await websocket.close()
        del self.connections[user_id]

    async def send_messages(self, user_ids, message):
        for user_id in user_ids:
            websocket: WebSocket = self.connections[user_id]
            await websocket.send_json(message

一个WebSocket路由:
@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str, redis :Annotated [Redis, Depends(get_redis)]):
    user_id = redis.get(token)
    if user_id:
        redis.expire(user_id)
    else:
        raise redis_error
    
    try:
        manager.connect(user_id, WebSocket)
    except WebSocketException:
        manager.disconnect(user_id)

我想要存储用户的连接,并且当一个Redis pubsub消息到达时,处理该消息并发送一个WebSocket消息给一些用户。处理该消息的模块不是Fastapi应用的一部分。
我尝试在Fastapi应用内部通过实现线程和异步操作来实现这一点,但是这些操作中断了Fastapi应用本身。
我应该如何在Fastapi应用之外触发WebSocket对象的发送消息?
我尝试过的方法:
redis = Redis(redis_host, redis_port)
pubsub = redis.pubsub()
pubsub.subscribe("channel_signal")

@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str):
    message = await  
    pubsub.get_message(ignore_subscribe_messages=True)
    if message is not None:
         # do something
    try:
        manager.connect(user_id, WebSocket)
    except WebSocketException:
        manager.disconnect(user_id)

但是我从Redis得到了pubsub错误,它说我还没有订阅,除非我像这样做:

@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str):
    redis = Redis(redis_host, redis_port)
    pubsub = redis.pubsub()
    pubsub.subscribe("channel_signal")

    message = await  
    pubsub.get_message(ignore_subscribe_messages=True)
    if message is not None:
        # do something
    try:
        manager.connect(user_id, WebSocket)
    except WebSocketException:
        manager.disconnect(user_id)

但是这样会为每个连接到WebSocket的用户创建一个Redis连接,有没有办法为所有用户全局定义一个Redis连接?
更新1:
根据VonC的回答,我写了以下内容:
from fastapi import FastAPI, WebSocket, WebSocketException
from v1.endpoints.user.auth import router as auth_router
from v1.endpoints.signals import router as signals_router
from configs.connection_config import redis_host, redis_port
import redis.asyncio as aioredis
import threading
import asyncio
import uuid

app = FastAPI()
app.include_router(auth_router, prefix="/users/auth", tags = ["auth"])
app.include_router(signals_router, prefix="/signals", tags = ["signals"])



class ConnectionManager:
    last_message = ""
    def __init__(self) -> None:
        self.connections  = {}
 
    async def connect(self, user_id: str, websocket: WebSocket):
        await websocket.accept()
        self.connections[user_id] = websocket

    async def disconnect(self, user_id):
        websocket: WebSocket = self.connections[user_id]
        await websocket.close()
        del self.connections[user_id]

    async def send_messages(self, user_ids, message):
        for user_id in user_ids:
            websocket: WebSocket = self.connections[user_id]
            await websocket.send_json(message)

manager = ConnectionManager()

@app.websocket("/ws")
async def ws(websocket: WebSocket):
    try:
        await manager.connect(str(uuid.uuid4()), websocket)
    except WebSocketException:
        await manager.disconnect(str(uuid.uuid4()))

redis_client = None
@app.on_event("startup")
async def startup_event_connect_redis():
    global redis_client
    redis_client = aioredis.Redis(host=redis_host, port=redis_port)

def listen_to_redis():
    pubsub = redis_client.pubsub()
    pubsub.subscribe("channel_signal")
    while True:
        message = pubsub.get_message(ignore_subscribe_messages=True)
        if message:
            print(message["data"])

@app.on_event("startup")
async def startup_event_listen_redis():
    # Starting the separate thread to listen to Redis Pub/Sub messages
    threading.Thread(target=listen_to_redis, daemon=True).start()

由于`aioredis`已被弃用,我正在使用`redispy`版本的`aioredis`。
我在这部分遇到了问题:
while True:
   message = pubsub.get_message(ignore_subscribe_messages=True)
   if message:
       print("hi")

如果消息总是成真,并且不断打印“hi”,那么任何事件都会无限触发。
更新2:
尽管由于与fastapi的另一个问题(我将在这里打开一个新的线程link)而无法完全测试答案,但更新中添加的内容实现了为所有用户定义全局redis连接并从fastapi生命周期中单独监听它。但是,我不得不使用实际的“aioredis”库,而不是“redispy”版本。

1
它将遵循参考文档中所见的相同模式 - 您向您的WebSocket端点发出请求,然后await redis.xread或类似的操作以从Redis流中获取消息;然后将其发送给相关的已连接客户端。 - undefined
这可能行得通,但是难道每个websocket连接都意味着也有一个redis连接吗?有没有办法为所有websocket连接全局定义一个redis连接? - undefined
我已经编辑了下面的回答以应对你的编辑。 - undefined
@VonC 谢谢你的回答!我更新了问题,因为我遇到了与这个问题相关的另一个问题,但是你的答案帮了我很多。 - undefined
非常欢迎您。希望我也能帮到您的其他问题,并已在那里发布了答案 - undefined
显示剩余2条评论
1个回答

4
要在您的FastAPI应用程序中实现全局Redis连接并在不同的WebSocket连接之间使用它,您应该考虑创建一个在FastAPI应用程序启动时初始化的共享Redis客户端。
为了独立于FastAPI请求-响应生命周期处理Redis pubsub消息,您可以创建一个单独的线程或进程来监听Redis pubsub消息,并相应地触发WebSocket消息。
from fastapi import FastAPI, WebSocket
from aioredis import create_redis_pool
import threading
import json
import asyncio

app = FastAPI()

# Connection manager class definition (place your existing implementation here)
class ConnectionManager:
    # same as your code

# Global variables
manager = ConnectionManager()
redis_client = None

# Step 0: WebSocket Connection Request (Client)
@app.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
    await manager.connect(user_id, websocket)
    try:
        while True:
            data = await websocket.receive_text()
            # (handle received messages if necessary)
    except:
        await manager.disconnect(user_id)

# Step 1: Global Redis Client and Connection Manager
@app.on_event("startup")
async def startup_event():
    global redis_client
    redis_client = await create_redis_pool("redis://localhost:6379")

# Step 2: Separate Thread for Redis Pub/Sub Listener
def listen_to_redis():
    pubsub = redis_client.pubsub()
    pubsub.subscribe("channel_signal")
    
    for message in pubsub.listen():
        if message['type'] == 'message':
            data = json.loads(message['data'])
            process_redis_message(data)

@app.on_event("startup")
async def startup_event_2():
    # Starting the separate thread to listen to Redis Pub/Sub messages
    threading.Thread(target=listen_to_redis, daemon=True).start()

# Step 4: Processing Redis Messages
def process_redis_message(data):
    user_ids = data.get('user_ids')
    content = data.get('content')
    if user_ids and content:
        send_websocket_messages(user_ids, content)

# Step 5: Sending WebSocket Messages
def send_websocket_messages(user_ids, content):
    for user_id in user_ids:
        websocket = manager.connections.get(user_id)
        if websocket:
            asyncio.run(websocket.send_text(content))

通过Redis pubsub消息,从FastAPI应用程序外部发送消息到WebSocket连接的方式是按照以下工作流程进行的:
  External System           FastAPI App                       Redis Server
        |                         |                                   |
        |                         |                                   |
        |                         |--------(1) Initialize ----------->|
        |                         |        Redis Client               |
        |                         |                                   |
        |                         |-------(2) Start Thread ---------->|
        |                         |       (listen_to_redis)           |
        |                         |                                   |
 (0) WebSocket                    |                                   |
 Connection --------------------> |                                   |
 Request (Client)                 |                                   |
        |                         |                                   |
        |                         |                                   |
        |----(3) Publish Msg----->|                                   |
        |  to Redis channel       |                                   |
        |                         |                                   |
        |                         |                                   |
        |                         |<-------(4) Redis Listener --------|
        |                         |      (receive & process msg)      |
        |                         |                                   |
        |                         |                                   |
        |                         |------(5) WebSocket Msg ---------->|
        |                         |       (send to users)             |
        |                         |                                   |

WebSocket连接请求(客户端):客户端通过向FastAPI应用程序中的WebSocket端点发送请求来初始化WebSocket连接。
在此端点内部,您使用连接管理器来管理(存储、检索、删除)这些WebSocket连接。 这一步设置了管理WebSocket连接所需的基础设施,这些连接在第5步中用于向连接的客户端发送消息。
全局Redis客户端和连接管理器:在FastAPI应用程序启动时,建立了一个全局Redis客户端和WebSocket连接管理器。连接管理器跟踪所有活动的WebSocket连接。
在FastAPI应用程序启动时初始化Redis客户端:
(我使用aioredis,这是一个专为Python的asyncio设计的Redis客户端。根据您使用的Redis客户端库,调整Redis客户端的实例化和使用。)
为Redis Pub/Sub监听器启动单独的线程:启动一个单独的线程来持续监听Redis Pub/Sub消息。
该线程与FastAPI应用程序独立运行,不绑定到FastAPI应用程序中的任何特定路由或端点。 即使消息是从FastAPI应用程序外部发布的(可能是从具有对同一Redis实例的访问权限的另一个服务或应用程序),它也可以接收和处理消息。
从FastAPI应用程序外部向Redis发布消息:任何希望向FastAPI应用程序管理的WebSocket连接发送消息的外部系统或服务,都可以通过向FastAPI应用程序订阅的Redis频道发布消息来实现。
该外部系统/服务不需要直接与FastAPI应用程序交互;它只需要能够向Redis频道发布消息。
处理Redis消息:当消息发布到Redis频道时,运行listen_to_redis函数的单独线程会接收到该消息。
在此函数内部,您将实现处理Redis消息并确定应将消息发送到哪些WebSocket连接的user_ids的逻辑。
发送WebSocket消息:一旦确定了user_ids,listen_to_redis函数将调用连接管理器的send_messages方法,将消息发送到相应的WebSocket连接。
该方法会迭代确定的user_ids,并将消息发送到每个对应的WebSocket连接。

所以,要从FastAPI应用程序外部触发WebSocket消息发送:

  • 外部系统/服务将消息发布到Redis频道。
  • 在单独的线程中运行的listen_to_redis函数接收并处理此消息。
  • 处理后的消息然后通过连接管理器的send_messages方法发送到适当的WebSocket连接。

这种机制应该允许通过Redis pub/sub的中介将消息从FastAPI应用程序外部发送到WebSocket连接。


I have a problem with this part:

while True:
  message = pubsub.get_message(ignore_subscribe_messages=True)
  if message:
      print("hi")

if message always comes true and at continuously prints hi so any event would be triggered indefinitely.

listen_to_redis 函数中的 pubsub.get_message(ignore_subscribe_messages=True) 似乎总是返回一个非 None 的值,即使没有可用的消息,导致 if message: 语句的评估结果为 True
你的 listen_to_redis 函数似乎是同步的,而你的 FastAPI 应用程序的其他部分是异步的。 而 get_message()(消息循环)函数是非阻塞的,意味着如果没有可用的消息,它将立即返回 None,导致在 while True 循环中 CPU 使用率很高。
对于处理 WebSocket 和 Redis pub/sub 等异步操作,最好使用 asyncio。你可以像这样异步地监听 Redis 频道:
from asyncio import run

async def listen_to_redis():
    pubsub = await redis_client.subscribe("channel_signal")
    channel = pubsub[0]

    while await channel.wait_message():
        message = await channel.get(encoding="utf-8")
        print("Message:", message)

并且可以将用于监听Redis的启动事件也设置为异步,以确保与FastAPI更好地兼容。
@app.on_event("startup")
async def startup_event_listen_redis():
    asyncio.create_task(listen_to_redis())

目标是使您的Redis发布/订阅在FastAPI生态系统中以异步方式运行,并更好地适应其中。请注意,这将需要使用与FastAPI的异步能力兼容的异步Redis客户端。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接