Python: 同步程序中的Websockets

7
我有一个普通的同步Python程序,需要能够从WebSockets读取数据并使用该数据更新GUI。然而,asyncio的问题一直困扰着我。
我需要创建一个模块,其功能为:
  1. 接受多个订阅源的多个订阅
  2. 在有数据时向请求者发送更新
  3. 每个URL仅打开一个WebSocket连接
  4. 如果WebSocket关闭,则重置它
这是我的现有代码,但它在许多方面都失败了:
  1. run_forever() 意味着在订阅完成之前循环被卡住,然后handle() 在错误的while循环中卡住。
  2. 当Socket(s)关闭时,它似乎不想重新启动,因为websocket对象没有connected属性(没有"s"的websocket有,但我不清楚差异,也找不到在线信息)
  3. 我绝对不确定我的方法是否正确。
我已经为此奋斗了几周。希望能得到一些指导。
class WSClient():
    subscriptions = set()
    connections = {}
    started = False

    def __init__(self):
        self.loop = asyncio.get_event_loop()

    def start(self):
        self.started = True
        self.loop.run_until_complete(self.handle())
        self.loop.run_until_forever()  # problematic, because it does not allow new subscribe() events

    async def handle(self):
        while len(self.connections) > 0:
            # listen to every websocket
            futures = [self.listen(self.connections[url]) for url in self.connections]
            done, pending = await asyncio.wait(futures)

            # the following is apparently necessary to avoid warnings
            # about non-retrieved exceptions etc
            try:
                data, ws = done.pop().result()
            except Exception as e:
                print("OTHER EXCEPTION", e)

            for task in pending:
                task.cancel()

    async def listen(self, ws):
        try:
            async for data in ws:
                data = json.loads(data)
                # call the subscriber (listener) back when there's data
                [s.listener._handle_result(data) for s in self.subscriptions if s.ws == ws]
        except Exception as e:
            print('ERROR LISTENING; RESTARTING SOCKET', e)
            await asyncio.sleep(2)
            self.restart_socket(ws)

    def subscribe(self, subscription):
        task = self.loop.create_task(self._subscribe(subscription))
        asyncio.gather(task)

        if not self.started:
            self.start()

    async def _subscribe(self, subscription):
        try:
            ws = self.connections.get(subscription.url, await websockets.connect(subscription.url))
            await ws.send(json.dumps(subscription.sub_msg))

            subscription.ws = ws
            self.connections[subscription.url] = ws
            self.subscriptions.add(subscription)
        except Exception as e:
            print("ERROR SUBSCRIBING; RETRYING", e)
            await asyncio.sleep(2)
            self.subscribe(subscription)

    def restart_socket(self, ws):
        for s in self.subscriptions:
            if s.ws == ws and not s.ws.connected:
                print(s)
                del self.connections[s.url]
                self.subscribe(s)
2个回答

5
我有一个标准的同步python程序,需要能够从WebSockets读取数据并更新GUI上的数据。但是,asyncio常常使我遇到问题。
由于您提到了GUI,那么它可能不是一个“标准的同步python程序”。通常,GUI程序具有非阻塞事件驱动的主线程,允许并发用户行为和回调。这非常类似于asyncio,并且通常是asyncio与GUI一起工作的常见方式,可以使用GUI特定的事件循环替换asyncio中的默认事件循环,因此您的asyncio协程只运行在GUI事件循环中,可以避免调用run_forever()来阻塞一切。
另一种方法是在单独的线程中运行asyncio事件循环,以便您的程序可以同时等待websocket数据和用户点击。我已将您的代码重写如下:
import asyncio
import threading
import websockets
import json


class WSClient(threading.Thread):

    def __init__(self):
        super().__init__()
        self._loop = None
        self._tasks = {}
        self._stop_event = None

    def run(self):
        self._loop = asyncio.new_event_loop()
        self._stop_event = asyncio.Event(loop=self._loop)
        try:
            self._loop.run_until_complete(self._stop_event.wait())
            self._loop.run_until_complete(self._clean())
        finally:
            self._loop.close()

    def stop(self):
        self._loop.call_soon_threadsafe(self._stop_event.set)

    def subscribe(self, url, sub_msg, callback):
        def _subscribe():
            if url not in self._tasks:
                task = self._loop.create_task(
                    self._listen(url, sub_msg, callback))
                self._tasks[url] = task

        self._loop.call_soon_threadsafe(_subscribe)

    def unsubscribe(self, url):
        def _unsubscribe():
            task = self._tasks.pop(url, None)
            if task is not None:
                task.cancel()

        self._loop.call_soon_threadsafe(_unsubscribe)

    async def _listen(self, url, sub_msg, callback):
        try:
            while not self._stop_event.is_set():
                try:
                    ws = await websockets.connect(url, loop=self._loop)
                    await ws.send(json.dumps(sub_msg))
                    async for data in ws:
                        data = json.loads(data)

                        # NOTE: please make sure that `callback` won't block,
                        # and it is allowed to update GUI from threads.
                        # If not, you'll need to find a way to call it from
                        # main/GUI thread (similar to `call_soon_threadsafe`)
                        callback(data)
                except Exception as e:
                    print('ERROR; RESTARTING SOCKET IN 2 SECONDS', e)
                    await asyncio.sleep(2, loop=self._loop)
        finally:
            self._tasks.pop(url, None)

    async def _clean(self):
        for task in self._tasks.values():
            task.cancel()
        await asyncio.gather(*self._tasks.values(), loop=self._loop)

谢谢。我还有几个更紧急的事情要处理,但会尝试您的解决方案并告诉您它的效果。 - bluppfisk
感谢您的建议。通过一处小改动,它现在至少与服务器进行了通信:我在订阅时创建了新的事件循环,而不是在运行时创建。否则它会为空。然而,尽管WSClient在单独的线程中运行并使用GLib.idle_add()向GUI发送更新信号,但仍然会在某种程度上阻塞GUI。事实上,GUI 的任何部分都没有被初始化,所以我仍然在想这是怎么回事。 - bluppfisk
它正在从WebSocket打印响应,但GUI根本没有显示。这可能是因为在启动ws客户端后调用了Gtk.main()。但是,如果我在之前启动它,主循环将不允许运行其他循环。因此,也许您的第一个解决方案是正确的,只是我还不知道如何操作。 - bluppfisk

-2
你可以尝试使用Tornado和Autobahn-Twisted来处理WebSockets。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接