如何在Python(FastAPI)中调用更新后端状态的函数时发送服务器端事件

11

我有以下问题:给定一个运行fastapi的后端,具有流式传输端点,用于更新前端,我希望每次调用更新后端状态的函数时(可以是按计划作业或触发状态更新的不同端点),都能发送这些更新。

我想实现的简化版本如下:

from fastapi import FastAPI
from starlette.responses import StreamingResponse

class State:
    def __init__(self):
        self.messages = []

    def update(self, new_messages):
        self.messages = new_messages
        # HERE: notify waiting stream endpoint

app = FastAPI()

state = State()

@app.get('/stream')
def stream():
    def event_stream():
        while True:
            # HERE lies the question: wait for state to be update
            for message in state.messages:
                yield 'data: {}\n\n'.format(json.dumps(message))
    return StreamingResponse(event_stream(), media_type="text/event-stream")

我希望这个程序永远运行下去,每当状态更新时,event_stream 就会解除阻塞并发送消息。

我已经查看了线程和asyncio,但我有一种感觉,我错过了如何在Python中完成这个简单概念的东西。

2个回答

26

FastAPI基于Starlette开发,而Starlette可以使用Server-Sent Events插件。

import asyncio
import uvicorn
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse

MESSAGE_STREAM_DELAY = 1  # second
MESSAGE_STREAM_RETRY_TIMEOUT = 15000  # milisecond
app = FastAPI()


@app.get('/stream')
async def message_stream(request: Request):
    def new_messages(): ...
    async def event_generator():
        while True:
            # If client was closed the connection
            if await request.is_disconnected():
                break

            # Checks for new messages and return them to client if any
            if new_messages():
                yield {
                        "event": "new_message",
                        "id": "message_id",
                        "retry": MESSAGE_STREAM_RETRY_TIMEOUT,
                        "data": "message_content"
                }

            await asyncio.sleep(MESSAGE_STREAM_DELAY)

    return EventSourceResponse(event_generator())


if __name__ == "__main__":
    uvicorn.run(app, host="127.0.0.1", port=8000)


很好的回答,这里的new_messages()函数是做什么用的? - GooJ
@GooJ 检查你想要发送的数据是否准备好。 - Saber Hayati
1
抱歉,我看到这是目的,但我想知道它是如何实现的?因为new_message()看起来像是一个占位符函数。 - GooJ
@GooJ 是的,这是一个占位符。这完全取决于您想让这个函数做什么。例如,在聊天应用程序中,它可以检查数据库是否有新消息,如果有新消息,则通过SSE连接将该消息发送给客户端。此外,如果您打算每隔X秒定期发送数据,则根本不需要此功能。 - Saber Hayati

3
我找到的最简单的解决方法是使用threading.Condition
因此,它变成了:
import threading

from fastapi import FastAPI
from starlette.responses import StreamingResponse

condition = threading.Condition()

class State:
    def __init__(self):
        self.messages = []

    def update(self, new_messages):
        self.messages = new_messages
        with condition:
            condition.notify()

app = FastAPI()

state = State()

@app.get('/stream')
def stream():
    def event_stream():
        while True:
            with condition:
                condition.wait()

            for message in state.messages:
                yield 'data: {}\n\n'.format(json.dumps(message))
    return StreamingResponse(event_stream(), media_type="text/event-stream")





4
如果在“event_stream”注意到新消息之前,“update”被调用两次,这段代码会丢失消息吗?请参考queue作为条件变量的更高级替代方案。 update可以将消息放入队列中,“event_stream”从队列中取出它们。您将获得与当前相同的行为,但不会丢失消息,并且代码更易于理解。 - user4815162342
@vini 这个问题被标记为 asyncio,但你实现了一个同步的答案。再看一下 @saber-hayati 的回答。 - jwm

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接