如何在类中实现asyncio websockets?

25

我想通过asynciowebsockets连接到一个websocket,格式如下所示。我该如何实现?

from websockets import connect


class EchoWebsocket:

    def __init__(self):
        self.websocket = self._connect()

    def _connect(self):
        return connect("wss://echo.websocket.org")

    def send(self, message):
        self.websocket.send(message)

    def receive(self):
        return self.websocket.recv()

echo = EchoWebsocket()
echo.send("Hello!")
print(echo.receive())  # "Hello!"
1个回答

33

如何编写异步程序?

  1. 您应该使用 async 来定义异步函数
  2. 您应该使用 await 来调用异步函数
  3. 在启动异步程序时,您需要使用事件循环

除此以外,与正常的 Python 程序几乎一样。

import asyncio
from websockets import connect


class EchoWebsocket:
    async def __aenter__(self):
        self._conn = connect("wss://echo.websocket.org")
        self.websocket = await self._conn.__aenter__()        
        return self

    async def __aexit__(self, *args, **kwargs):
        await self._conn.__aexit__(*args, **kwargs)

    async def send(self, message):
        await self.websocket.send(message)

    async def receive(self):
        return await self.websocket.recv()


async def main():
    async with EchoWebsocket() as echo:
        await echo.send("Hello!")
        print(await echo.receive())  # "Hello!"


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

输出:

Hello!

正如你所看到的,代码与你编写的几乎相同。

唯一的区别是websockets.connect被设计为异步上下文管理器(它使用__aenter____aexit__)。释放连接是必要的,并且还将帮助您在类初始化期间执行异步操作(因为我们没有__init__的异步版本)。

我建议你以相同的方式组织你的类。但是,如果由于某种原因你真的不想使用上下文管理器,你可以使用新的__await__方法进行异步初始化,以及其他一些异步函数来释放连接:

import sys
import asyncio
from websockets import connect


class EchoWebsocket:
    def __await__(self):
        # see: https://dev59.com/UFwX5IYBdhLWcg3wwx1S#33420721
        return self._async_init().__await__()

    async def _async_init(self):
        self._conn = connect("wss://echo.websocket.org")
        self.websocket = await self._conn.__aenter__()
        return self

    async def close(self):
        await self._conn.__aexit__(*sys.exc_info())

    async def send(self, message):
        await self.websocket.send(message)

    async def receive(self):
        return await self.websocket.recv()


async def main():
    echo = await EchoWebsocket()
    try:
        await echo.send("Hello!")
        print(await echo.receive())  # "Hello!"
    finally:
        await echo.close()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

你可以在 websockets文档中找到很多使用示例。


1
这个在收到回复后就关闭了。它如何继续接收消息? - Alex
1
@FeedTheWeb 在上下文管理器中一直接收回复。 - Mikhail Gerasimov
@MikhailGerasimov 我不理解上面的评论。您如何确保脚本不在一个回复后就结束了呢?谢谢。 - HJA24
如果您想要不断地发送/接收数据,通常需要启动一个服务器。例如,查看此处的代码片段,特别是第server.serve_forever()行。具体启动服务器的方式取决于您想要实现什么目标。与asyncio一起使用的流行高级库是aiohttp。它提供了一种处理许多协议(包括WebSockets)的方法。 - Mikhail Gerasimov
@MikhailGerasimov 你好,我正在遵循另一个来源的文档,他们没有使用 server.serve.forever(),所以我有点困惑。你能否看一下我的问题,链接在SO上 https://dev59.com/Irroa4cB1Zd3GeqPi151 - HJA24

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接