我正在单独的线程中运行WebSocket监听器。 我想连接到WebSocket,然后执行以下操作:
while True:
msg = sock.wait_for_message()
f(msg)
即没有async/asyncio。
这样做很蠢吗?有没有方法可以实现?
loop=
输入参数已被弃用。我不确定具体意思是什么,作者是打算在将来删除它还是提出替代方案。(请参见这里)import websockets
import websockets.server
import queue
import asyncio
import time
class SynchronousWebsocketServer:
"""
Synchronous wrapper around asynchronous websockets server by Pier-Yves Lessard
"""
def __init__(self, connect_callback=None, disconnect_callback=None):
self.rxqueue = queue.Queue()
self.txqueue = queue.Queue()
self.loop = asyncio.new_event_loop()
self.ws_server = None
self.connect_callback = connect_callback
self.disconnect_callback = disconnect_callback
# Executed for each websocket
async def server_routine(self, websocket, path):
if self.connect_callback is not None:
self.connect_callback(websocket)
try:
async for message in websocket:
self.rxqueue.put( (websocket, message) ) # Possible improvement : Handle queue full scenario.
except (websockets.exceptions.ConnectionClosedOK, websockets.exceptions.ConnectionClosedError):
pass
finally:
if self.disconnect_callback is not None:
self.disconnect_callback(websocket)
def process_tx_queue(self):
while not self.txqueue.empty():
(websocket, message) = self.txqueue.get()
try:
self.loop.run_until_complete(websocket.send(message))
except (websockets.exceptions.ConnectionClosedOK, websockets.exceptions.ConnectionClosedError):
pass # Client is disconnected. Disconnect callback not called yet.
def process(self, nloop=3) -> None:
self.process_tx_queue()
for i in range(nloop): # Process events few times to make sure we handles events generated within the loop
self.loop.call_soon(self.loop.stop)
self.loop.run_forever()
def start(self, host, port) -> None:
# Warning. websockets source code says that loop argument might be deprecated.
self.ws_server = websockets.serve(self.server_routine, host, port, loop=self.loop)
self.loop.run_until_complete(self.ws_server) # Initialize websockets async server
def stop(self) -> None:
if self.ws_server is not None:
self.ws_server.ws_server.close()
self.loop.run_until_complete(asyncio.ensure_future(self.ws_server.ws_server.wait_closed(), loop=self.loop))
self.loop.stop()
if __name__ == '__main__':
# Demo on how to use the SynchronousWebsocketServer
clients = set()
def connect_callback(websocket):
clients.add(websocket)
print('New client. Websocket ID = %s. We now have %d clients' % (id(websocket), len(clients)))
def diconnect_callback(websocket):
clients.remove(websocket)
print('Client diconnected. Websocket ID = %s. %d clients remaining' % (id(websocket), len(clients)))
server = SynchronousWebsocketServer(connect_callback=connect_callback, disconnect_callback=diconnect_callback)
print("Starting server")
server.start('localhost', 5555)
print("Server started")
while True: # Synchornous loop
try:
server.process()
if not server.rxqueue.empty():
websocket, message = server.rxqueue.get_nowait() # Non-blocking read. We need to keep call "server.process()"
print("Received message from websocket ID=%s. Echoing %s " % (id(websocket), message))
server.txqueue.put((websocket, message)) # echo
time.sleep(0.005)
except KeyboardInterrupt:
break
print("Stopping server")
server.stop()
print("Server stopped")
ws1 = new WebSocket('ws://localhost:5555')
ws2 = new WebSocket('ws://localhost:5555')
ws3 = new WebSocket('ws://localhost:5555')
ws2.send('Hello World')
ws2.close()
ws1.send('Hi')
// Killed server here.
Starting server
Server started
New client. Websocket ID = 140097014008368. We now have 1 clients
New client. Websocket ID = 140097014017824. We now have 2 clients
New client. Websocket ID = 140097014018928. We now have 3 clients
Received message from websocket ID=140097014017824. Echoing Hello World
Client diconnected. Websocket ID = 140097014017824. 2 clients remaining
Received message from websocket ID=140097014008368. Echoing Hi
^C
Stopping server
Client diconnected. Websocket ID = 140097014018928. 1 clients remaining
Client diconnected. Websocket ID = 140097014008368. 0 clients remaining
Server stopped