我有一个普通的同步Python程序,需要能够从WebSockets读取数据并使用该数据更新GUI。然而,asyncio的问题一直困扰着我。
我需要创建一个模块,其功能为:
我需要创建一个模块,其功能为:
- 接受多个订阅源的多个订阅
- 在有数据时向请求者发送更新
- 每个URL仅打开一个WebSocket连接
- 如果WebSocket关闭,则重置它
run_forever()
意味着在订阅完成之前循环被卡住,然后handle()
在错误的while
循环中卡住。- 当Socket(s)关闭时,它似乎不想重新启动,因为websocket对象没有
connected
属性(没有"s"的websocket有,但我不清楚差异,也找不到在线信息) - 我绝对不确定我的方法是否正确。
class WSClient():
subscriptions = set()
connections = {}
started = False
def __init__(self):
self.loop = asyncio.get_event_loop()
def start(self):
self.started = True
self.loop.run_until_complete(self.handle())
self.loop.run_until_forever() # problematic, because it does not allow new subscribe() events
async def handle(self):
while len(self.connections) > 0:
# listen to every websocket
futures = [self.listen(self.connections[url]) for url in self.connections]
done, pending = await asyncio.wait(futures)
# the following is apparently necessary to avoid warnings
# about non-retrieved exceptions etc
try:
data, ws = done.pop().result()
except Exception as e:
print("OTHER EXCEPTION", e)
for task in pending:
task.cancel()
async def listen(self, ws):
try:
async for data in ws:
data = json.loads(data)
# call the subscriber (listener) back when there's data
[s.listener._handle_result(data) for s in self.subscriptions if s.ws == ws]
except Exception as e:
print('ERROR LISTENING; RESTARTING SOCKET', e)
await asyncio.sleep(2)
self.restart_socket(ws)
def subscribe(self, subscription):
task = self.loop.create_task(self._subscribe(subscription))
asyncio.gather(task)
if not self.started:
self.start()
async def _subscribe(self, subscription):
try:
ws = self.connections.get(subscription.url, await websockets.connect(subscription.url))
await ws.send(json.dumps(subscription.sub_msg))
subscription.ws = ws
self.connections[subscription.url] = ws
self.subscriptions.add(subscription)
except Exception as e:
print("ERROR SUBSCRIBING; RETRYING", e)
await asyncio.sleep(2)
self.subscribe(subscription)
def restart_socket(self, ws):
for s in self.subscriptions:
if s.ws == ws and not s.ws.connected:
print(s)
del self.connections[s.url]
self.subscribe(s)
GLib.idle_add()
向GUI发送更新信号,但仍然会在某种程度上阻塞GUI。事实上,GUI 的任何部分都没有被初始化,所以我仍然在想这是怎么回事。 - bluppfiskGtk.main()
。但是,如果我在之前启动它,主循环将不允许运行其他循环。因此,也许您的第一个解决方案是正确的,只是我还不知道如何操作。 - bluppfisk