Python Flask,如何检测来自前端Javascript的SSE客户端断开连接

27

也许这是Flask的一个问题,服务器端没有处理断开连接事件的方法。

在Response类中,有一个名为"call_on_close"的方法,我们可以添加一个没有参数的函数(如on_close()),当响应对象的close方法被调用时,它将被触发,但是当我在Javascript中从客户端调用EventSource.close()时,它并不会发生。

服务器端代码:

from flask import Response
r = Response(stream(), ...)
r.call_on_close(on_close)
return r 

def on_close():
  print "response is closed!"

def stream():
  ...  # subscribe to redis
  for message in pubsub.listen():
    ....
    yield 'data: %s\n\n' % message
在客户端上:添加卸载处理程序以使用SSE加载页面。
$(window).unload(
  function() {
    sse.close();
  }
}

有什么不对的地方吗?

欢迎提出任何代码建议或解决方案。

提前致谢!


2
你是否找到了解决方案? - luckydonald
3个回答

9
生成器接收到 GeneratorExit 异常时,就会退出。例如:
def stream():
    try:
        i = 0
        while True:
            yield 'Hello {}!'.format(i)
            i += 1
            time.sleep(1)
    finally:
        # Called after a GeneratorExit, cleanup here
        i = 0


@app.route('/messages')
def messages():
    return Response(stream(), content_type='text/event-stream')

将产生一个无限流的"Hello!",并且您会知道何时完成,以便可以运行清理代码。如果生成器阻塞线程,则需要以某种方式解除阻塞(例如推送虚拟项),以便可以关闭生成器。


这个特定的例子似乎可以工作,但不能与非阻塞的Redis pubsub.get_message()循环一起使用。 - 2080
1
从你的回答中,我猜测你从未使用过 yield if not message。我猜想这是因为你需要 yield 让调用者有机会停止生成器,并让生成器知道它已经停止。 - Lonami
是的,没错!这也是为什么使用阻塞式监听函数是不可能的原因,除非有频繁的ping/alive消息触发yield。 - 2080

2

在扩展@Lonami的答案时,当使用返回数据的非阻塞函数时,需要使用yield

def stream():
    try:
        pubsub = red.pubsub()
        pubsub.subscribe('chat')
        #for message in pubsub.listen():#This doesn't work because it's blocking
        while True:
            message = pubsub.get_message()

            if not message:
                # The yield is necessary for this to work!
                # In my case I always send JSON encoded data
                # An empty response might work, too.
                yield "data: {}\n\n"
                sleep(0.1)
                continue

            # If the nonblocking get_message() returned something, proceed normally
            yield 'data: %s\n\n' % message["data"]
    finally:
        print("CLOSED!")
        # Your closing logic here (e.g. marking the user as offline in your database)


@app.route('/messages')
def messages():
    return Response(stream(), content_type='text/event-stream')

但这不是一个非阻塞的解决方案。你正在使用 time.sleep(0.1)。一旦SSE被初始化,你将无法向服务器发送其他请求。 你应该将 def stream() 更改为 async def stream(),并且使用 await asyncio.sleep(0.1) 代替 time.sleep(0.1) - Desprit
实际上,我上面的解决方案可能不适用于 Flask。我正在使用异步的 FastAPI。 - Desprit

2

我曾经在Rails Live Controllers中遇到了类似的问题。问题在于该框架似乎无法检测到连接关闭,直到它尝试向客户端发送事件。

一种方法是向客户端发送定期的“心跳”事件。我目前在我的Rails项目中成功使用了这个方法,间隔为60秒。我有一个单独的线程将这些心跳事件“emit”到Redis中,而我的控制器已订阅了这些事件。

另一种方法是在Redis pubsub块中包裹一个超时(再次,比如60秒)。然后向客户端发送心跳事件,接着是另一个pubsub调用。这种方法的缺点是,在未订阅时可能会错过某些事件。

有关线程方法的更多信息,请参见此处: Redis + ActionController::Live threads not dying


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接