我有一个像这样的WebSocket连接管理器:
一个WebSocket路由:
我想要存储用户的连接,并且当一个Redis pubsub消息到达时,处理该消息并发送一个WebSocket消息给一些用户。处理该消息的模块不是Fastapi应用的一部分。
我尝试在Fastapi应用内部通过实现线程和异步操作来实现这一点,但是这些操作中断了Fastapi应用本身。
我应该如何在Fastapi应用之外触发WebSocket对象的发送消息?
我尝试过的方法:
但是我从Redis得到了pubsub错误,它说我还没有订阅,除非我像这样做:
但是这样会为每个连接到WebSocket的用户创建一个Redis连接,有没有办法为所有用户全局定义一个Redis连接?
更新1:
根据VonC的回答,我写了以下内容:
由于`aioredis`已被弃用,我正在使用`redispy`版本的`aioredis`。
我在这部分遇到了问题:
如果消息总是成真,并且不断打印“hi”,那么任何事件都会无限触发。
更新2:
尽管由于与fastapi的另一个问题(我将在这里打开一个新的线程link)而无法完全测试答案,但更新中添加的内容实现了为所有用户定义全局redis连接并从fastapi生命周期中单独监听它。但是,我不得不使用实际的“aioredis”库,而不是“redispy”版本。
class ConnectionManager:
def __init__(self) -> None:
self.connections = {}
async def connect(self, user_id: str, websocket: WebSocket):
await websocket.accept()
self.connections[user_id] = websocket
async def disconnect(self, user_id):
websocket: WebSocket = self.connections[user_id]
await websocket.close()
del self.connections[user_id]
async def send_messages(self, user_ids, message):
for user_id in user_ids:
websocket: WebSocket = self.connections[user_id]
await websocket.send_json(message
一个WebSocket路由:
@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str, redis :Annotated [Redis, Depends(get_redis)]):
user_id = redis.get(token)
if user_id:
redis.expire(user_id)
else:
raise redis_error
try:
manager.connect(user_id, WebSocket)
except WebSocketException:
manager.disconnect(user_id)
我想要存储用户的连接,并且当一个Redis pubsub消息到达时,处理该消息并发送一个WebSocket消息给一些用户。处理该消息的模块不是Fastapi应用的一部分。
我尝试在Fastapi应用内部通过实现线程和异步操作来实现这一点,但是这些操作中断了Fastapi应用本身。
我应该如何在Fastapi应用之外触发WebSocket对象的发送消息?
我尝试过的方法:
redis = Redis(redis_host, redis_port)
pubsub = redis.pubsub()
pubsub.subscribe("channel_signal")
@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str):
message = await
pubsub.get_message(ignore_subscribe_messages=True)
if message is not None:
# do something
try:
manager.connect(user_id, WebSocket)
except WebSocketException:
manager.disconnect(user_id)
但是我从Redis得到了pubsub错误,它说我还没有订阅,除非我像这样做:
@router.websocket("/ws/{token}")
async def ws(websocket: WebSocket, token: str):
redis = Redis(redis_host, redis_port)
pubsub = redis.pubsub()
pubsub.subscribe("channel_signal")
message = await
pubsub.get_message(ignore_subscribe_messages=True)
if message is not None:
# do something
try:
manager.connect(user_id, WebSocket)
except WebSocketException:
manager.disconnect(user_id)
但是这样会为每个连接到WebSocket的用户创建一个Redis连接,有没有办法为所有用户全局定义一个Redis连接?
更新1:
根据VonC的回答,我写了以下内容:
from fastapi import FastAPI, WebSocket, WebSocketException
from v1.endpoints.user.auth import router as auth_router
from v1.endpoints.signals import router as signals_router
from configs.connection_config import redis_host, redis_port
import redis.asyncio as aioredis
import threading
import asyncio
import uuid
app = FastAPI()
app.include_router(auth_router, prefix="/users/auth", tags = ["auth"])
app.include_router(signals_router, prefix="/signals", tags = ["signals"])
class ConnectionManager:
last_message = ""
def __init__(self) -> None:
self.connections = {}
async def connect(self, user_id: str, websocket: WebSocket):
await websocket.accept()
self.connections[user_id] = websocket
async def disconnect(self, user_id):
websocket: WebSocket = self.connections[user_id]
await websocket.close()
del self.connections[user_id]
async def send_messages(self, user_ids, message):
for user_id in user_ids:
websocket: WebSocket = self.connections[user_id]
await websocket.send_json(message)
manager = ConnectionManager()
@app.websocket("/ws")
async def ws(websocket: WebSocket):
try:
await manager.connect(str(uuid.uuid4()), websocket)
except WebSocketException:
await manager.disconnect(str(uuid.uuid4()))
redis_client = None
@app.on_event("startup")
async def startup_event_connect_redis():
global redis_client
redis_client = aioredis.Redis(host=redis_host, port=redis_port)
def listen_to_redis():
pubsub = redis_client.pubsub()
pubsub.subscribe("channel_signal")
while True:
message = pubsub.get_message(ignore_subscribe_messages=True)
if message:
print(message["data"])
@app.on_event("startup")
async def startup_event_listen_redis():
# Starting the separate thread to listen to Redis Pub/Sub messages
threading.Thread(target=listen_to_redis, daemon=True).start()
由于`aioredis`已被弃用,我正在使用`redispy`版本的`aioredis`。
我在这部分遇到了问题:
while True:
message = pubsub.get_message(ignore_subscribe_messages=True)
if message:
print("hi")
如果消息总是成真,并且不断打印“hi”,那么任何事件都会无限触发。
更新2:
尽管由于与fastapi的另一个问题(我将在这里打开一个新的线程link)而无法完全测试答案,但更新中添加的内容实现了为所有用户定义全局redis连接并从fastapi生命周期中单独监听它。但是,我不得不使用实际的“aioredis”库,而不是“redispy”版本。
await redis.xread
或类似的操作以从Redis流中获取消息;然后将其发送给相关的已连接客户端。 - undefined