Sanic Web服务器:Websocket处理程序在返回时关闭套接字;循环中断其他请求处理程序

3
场景:我有一个 Sanic Web 服务器提供简单网站的服务。该网站基本上是一个带有 Vue 模板支持的 HTML 大型数据表格。由于表格条目每几分钟就会更改,因此数据通过 WebSocket 在更改时传递。同时大约有 2000 名用户使用。我尝试实现发布/订阅架构。
问题:我的 WebSocket 在 Sanic 处理程序返回后立即关闭。我可以在内部设置循环以保持处理程序开放。但是保持 2000 个处理程序处于打开状态听起来不太好... 另外,已打开的处理程序表现得很奇怪。一个线程或一个小线程池应该能胜任这项工作。也许我误解了 Sanic 文档,需要一些设计建议。
我尝试过的事情: - 增加超时设置的时间足够长 - 尝试在 Sanic 中使用各种其他 WebSocket 设置 - 让我的客户端 JS 在 onmessage 上返回 false(Javascript websockets closing immediately after opening) - 在传递后将 ws 引用设置为 null
Sanic Web 服务器的索引:
@app.route('/')
async def serve_index(request):
    return await file(os.path.join(os.path.dirname(__file__), 'index.html'))

Index.html的JS:

var app = new Vue({
    el: '#app',
        data() {
            manydata0: 0,
            manydata1: 0,
            ws: null,
        }
    },
    methods: {
        update: function (json_data) {
            json = JSON.parse(json_data);
            this.manydata0 = json['data0'];
            this.manydata1 = json['data1'];
        }
    },
    created: function () {
        this.ws = new WebSocket('ws://' + document.domain + ':' + location.port + '/reload');
        messages = document.createElement('ul');
        this.ws.onmessage = function (event) {
            console.log("new data")
            app.update(event.data);
        return false;
    };
    document.body.appendChild(messages);
    this.ws.onclose = function (event) {
        console.log("closed :(")
    };

Sanic Web服务器的Websocket处理程序(第1版,套接字立即关闭):
@app.websocket('/reload')
async def feed(request, ws):
    #time.sleep(42) # this causes the websocket to be created and closed on client side 42 seconds after my request
    await ws.send(Path(json).read_text()) # serve initial data
    connected_clients.append(ws) # subscribe to websocket list. another thread will read list entries and serve them updates

Sanic Web服务器的Websocket处理程序(第二版,处理程序会阻塞其他请求处理程序)。
@app.websocket('/reload')
async def feed(request, ws):
    mod_time = 0
    while True:
        try:
            stat = os.stat(json)
            if mod_time != stat.st_mtime:
                await ws.send(Path(json).read_text())
        except Exception as e:
            print("Exception while checking file: ", e)
    # this stops the server to handle other @app.routes like css, fonts, favicon

Sanic Web服务器的Websocket处理程序(第三版,不需要recv())。
@app.websocket('/reload')
async def feed(request, ws):
    mod_time = 0
    while True:
        try:
            stat = os.stat(json)
            if mod_time != stat.st_mtime:
                await ws.send(Path(json).read_text())
                await recv() # if the client sends from time to time all is fine
        except Exception as e:
            print("Exception while checking file: ", e)

最后两个代码片段并没有太大的区别。我添加了一个ws.recv(),并从客户端发送了一些适合的东西(例如在间隔中),然后一切正常。然后发送了css、字体和favicon。但这可能不是有意的,对吧?这应该不能很好地扩展,对吧?
总的来说,这对我来说没有太多意义。我误解了什么?
1个回答

3
我是Sanic核心开发者之一。
首先,以发布订阅类型的架构为例,这里有一个我准备的代码片段。我认为它可能会有所帮助。
我的基本想法是创建一个单一的 Feed 对象,在自己的任务中循环查找事件。在我的情况下,它是从发布订阅接收信息。在您的情况下,应该检查JSON文档上的时间。
然后,当 Feed.receiver 触发了一个事件时,它会向所有正在侦听的客户端发送ping请求。
websocket 处理程序内部,您需要保持其处于打开状态。如果不这样做,连接将关闭。如果您不关心从客户端接收信息,则不需要使用 await recv()
因此,在您的情况下,使用非常简单的逻辑,我会执行以下操作。 这是未经测试的代码,可能需要进行一些调整
import os
import random
import string
from functools import partial
from pathlib import Path

from sanic import Sanic

import asyncio
import websockets
from dataclasses import dataclass, field
from typing import Optional, Set

app = Sanic(__name__)

FILE = "/tmp/foobar"
TIMEOUT = 10
INTERVAL = 20


def generate_code(length=12, include_punctuation=False):
    characters = string.ascii_letters + string.digits
    if include_punctuation:
        characters += string.punctuation
    return "".join(random.choice(characters) for x in range(length))


@dataclass
class Client:
    interface: websockets.server.WebSocketServerProtocol = field(repr=False)
    sid: str = field(default_factory=partial(generate_code, 36))

    def __hash__(self):
        return hash(str(self))

    async def keep_alive(self) -> None:
        while True:
            try:
                try:
                    pong_waiter = await self.interface.ping()
                    await asyncio.wait_for(pong_waiter, timeout=TIMEOUT)
                except asyncio.TimeoutError:
                    print("NO PONG!!")
                    await self.feed.unregister(self)
                else:
                    print(f"ping: {self.sid} on <{self.feed.name}>")
                await asyncio.sleep(INTERVAL)
            except websockets.exceptions.ConnectionClosed:
                print(f"broken connection: {self.sid} on <{self.feed.name}>")
                await self.feed.unregister(self)
                break

    async def shutdown(self) -> None:
        self.interface.close()

    async def run(self) -> None:
        try:
            self.feed.app.add_task(self.keep_alive())
            while True:
                pass
        except websockets.exceptions.ConnectionClosed:
            print("connection closed")
        finally:
            await self.feed.unregister(self)


class Feed:
    app: Sanic
    clients: Set[Client]
    cached = None

    def __init__(self, app: Sanic):
        self.clients = set()
        self.app = app

    @classmethod
    async def get(cls, app: Sanic):
        is_existing = False

        if cls.cached:
            is_existing = True
            feed = cls.cached
        else:
            feed = cls(app)
            cls.cached = feed

        if not is_existing:
            feed.app.add_task(feed.receiver())

        return feed, is_existing

    async def receiver(self) -> None:
        print("Feed receiver started")
        mod_time = 0
        while True:
            try:
                stat = os.stat(FILE)
                print(f"times: {mod_time} | {stat.st_mtime}")
                if mod_time != stat.st_mtime:
                    content = self.get_file_contents()
                    for client in self.clients:
                        try:
                            print(f"\tSending to {client.sid}")
                            await client.interface.send(content)
                        except websockets.exceptions.ConnectionClosed:
                            print(f"ConnectionClosed. Client {client.sid}")
            except Exception as e:
                print("Exception while checking file: ", e)

    async def register(
        self, websocket: websockets.server.WebSocketServerProtocol
    ) -> Optional[Client]:
        client = Client(interface=websocket)
        print(f">>> register {client}")

        client.feed = self
        self.clients.add(client)

        # Send initial content
        content = self.get_file_contents()
        client.interface.send(content)

        print(f"\nAll clients\n{self.clients}\n\n")

        return client

    async def unregister(self, client: Client) -> None:
        print(f">>> unregister {client} on <{self.name}>")
        if client in self.clients:
            await client.shutdown()
            self.clients.remove(client)
            print(f"\nAll remaining clients\n{self.clients}\n\n")

    def get_file_contents(self):
        return Path(FILE).read_text()


@app.websocket("/reload")
async def feed(request, ws):
    feed, is_existing = await Feed.get(app)

    client = await feed.register(ws)
    await client.run()


if __name__ == "__main__":
    app.run(debug=True, port=7777)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接