不使用async/asyncio实现Python WebSocket监听器

3

我正在单独的线程中运行WebSocket监听器。 我想连接到WebSocket,然后执行以下操作:

while True:
    msg = sock.wait_for_message()
    f(msg)

即没有async/asyncio。

这样做很蠢吗?有没有方法可以实现?


2
不,这一点也不愚蠢。我不喜欢或者想在Python中使用异步编程。我就是讨厌它。我宁愿用老式的方式来做事情。目前正在自己研究这个问题... - VocoJax
2个回答

4

3
我知道这个问题是关于一个websocket客户端的。但是对于websocket服务器,我也有同样的疑问。作为一个对asyncio不熟悉且喜欢老式同步设计的新手,我很困惑地发现同步服务器设计的热情如此之低。我在网上找到的所有示例都假设整个应用程序都是异步的,这可能并不正确。
因此,我提出了将异步websockets模块封装成同步类的解决方案。希望能对某些人有所帮助。
思路如下:
  1. 为websockets模块创建一个专门的事件循环
  2. 在每次循环迭代时,将一个“停止”事件入队,以更新事件循环一次。这可以多次执行而不会出现问题。(参见此链接
  3. 将数据放入队列,供调用层读取/写入websockets
注意:查看websockets模块的源代码,似乎loop=输入参数已被弃用。我不确定具体意思是什么,作者是打算在将来删除它还是提出替代方案。(请参见这里
import websockets
import websockets.server
import queue
import asyncio
import time

class SynchronousWebsocketServer:
    """
    Synchronous wrapper around asynchronous websockets server by Pier-Yves Lessard
    """
    def __init__(self, connect_callback=None, disconnect_callback=None):
        self.rxqueue = queue.Queue()
        self.txqueue = queue.Queue()
        self.loop = asyncio.new_event_loop()
        self.ws_server = None
        self.connect_callback = connect_callback
        self.disconnect_callback = disconnect_callback

    # Executed for each websocket
    async def server_routine(self, websocket, path):
        if self.connect_callback is not None:
            self.connect_callback(websocket)

        try:
            async for message in websocket:
                self.rxqueue.put( (websocket, message) )   # Possible improvement : Handle queue full scenario.
        except (websockets.exceptions.ConnectionClosedOK, websockets.exceptions.ConnectionClosedError):
            pass
        finally:
            if self.disconnect_callback is not None:
                self.disconnect_callback(websocket)

    def process_tx_queue(self):
        while not self.txqueue.empty():
            (websocket, message) = self.txqueue.get()
            try:
                self.loop.run_until_complete(websocket.send(message))
            except (websockets.exceptions.ConnectionClosedOK, websockets.exceptions.ConnectionClosedError):
                pass    # Client is disconnected. Disconnect callback not called yet.

    def process(self, nloop=3) -> None:
        self.process_tx_queue()
        for i in range(nloop):  # Process events few times to make sure we handles events generated within the loop
            self.loop.call_soon(self.loop.stop)
            self.loop.run_forever()

    def start(self, host, port) -> None:
        # Warning. websockets source code says that loop argument might be deprecated. 
        self.ws_server = websockets.serve(self.server_routine, host, port, loop=self.loop)
        self.loop.run_until_complete(self.ws_server)    # Initialize websockets async server

    def stop(self) -> None:
        if self.ws_server is not None:
            self.ws_server.ws_server.close()
            self.loop.run_until_complete(asyncio.ensure_future(self.ws_server.ws_server.wait_closed(), loop=self.loop))
            self.loop.stop()


if __name__ == '__main__':
    # Demo on how to use the SynchronousWebsocketServer
    clients = set()
    def connect_callback(websocket):
        clients.add(websocket)
        print('New client. Websocket ID = %s. We now have %d clients' % (id(websocket), len(clients)))
    
    def diconnect_callback(websocket):
        clients.remove(websocket)
        print('Client diconnected. Websocket ID = %s. %d clients remaining' % (id(websocket), len(clients)))

    server = SynchronousWebsocketServer(connect_callback=connect_callback, disconnect_callback=diconnect_callback)
    print("Starting server")
    server.start('localhost', 5555)
    print("Server started")

    while True: # Synchornous loop
        try:
            server.process()
            if not server.rxqueue.empty():
                websocket, message = server.rxqueue.get_nowait()   # Non-blocking read. We need to keep call "server.process()" 
                print("Received message from websocket ID=%s. Echoing %s " % (id(websocket), message))
                server.txqueue.put((websocket, message))    # echo
            time.sleep(0.005)
        except KeyboardInterrupt:
            break
        
    print("Stopping server")
    server.stop()
    print("Server stopped")

为了测试这个服务器,我在浏览器控制台中执行了以下操作:
ws1 = new WebSocket('ws://localhost:5555')  
ws2 = new WebSocket('ws://localhost:5555')
ws3 = new WebSocket('ws://localhost:5555')

ws2.send('Hello World')
ws2.close()
ws1.send('Hi') 

// Killed server here.

这是生成的输出:
Starting server
Server started
New client. Websocket ID = 140097014008368. We now have 1 clients
New client. Websocket ID = 140097014017824. We now have 2 clients
New client. Websocket ID = 140097014018928. We now have 3 clients
Received message from websocket ID=140097014017824. Echoing Hello World 
Client diconnected. Websocket ID = 140097014017824. 2 clients remaining
Received message from websocket ID=140097014008368. Echoing Hi 
^C
Stopping server
Client diconnected. Websocket ID = 140097014018928. 1 clients remaining
Client diconnected. Websocket ID = 140097014008368. 0 clients remaining
Server stopped

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接