Python 异步 IO 流 API

4
我正在寻找有关Python asyncio包中StreamReader和StreamWriter类的用法模式的“超越基础”的指导。我试图使用protobuf构建一个有状态的服务器,并使用自定义协议。我应该将子类化StreamReader和StreamWriter来管理从protobuf字节的序列化吗?然后,我可以在读取器上提供read_message函数。我知道我可以复制streams.start_server中的代码并提供自己的StreamReader,但是如何设置我的StreamWriter呢?如果您有任何指针或示例,请不吝赐教。

我意识到这是一个广泛的问题,但是没有评论的负评并不能帮助更好地阐述它。 - MarkNS
2个回答

3

我建议您不要从StreamReader/StreamWriter派生,而是使用自己的类并具有类似的API。例如,我在aiozmq库中实现了这一点。


感谢你的建议,安德鲁。最终我决定使用子类化(如我在答案中所述)。希望这不会给我带来麻烦,但似乎我只需编写较少的代码即可实现我的目标。 - MarkNS

3
我发现子类化asyncio.streams库的类相对比较简单。
start_server函数从tcp server示例中提取:
@asyncio.coroutine
def start_server(self, loop):
    def factory():
        reader = QbpStreamReader()
        return QbpStreamReaderProtocol(reader, self._accept_client)

    logger.info("QbpServer starting at tcp://%s:%s", self.host, self.port)
    self.server = yield from loop.create_server(factory, self.host, self.port)

为了构建自己的StreamWriter,需要对StreamReaderProtocol进行子类化。除此之外,这与库函数相同。

class QbpStreamReaderProtocol(streams.StreamReaderProtocol):
    def connection_made(self, transport):
        self._stream_reader.set_transport(transport)
        if self._client_connected_cb is not None:
            self._stream_writer = QbpStreamWriter(transport, self,
                                                  self._stream_reader,
                                                  self._loop)
            res = self._client_connected_cb(self._stream_reader,
                                            self._stream_writer)
            if coroutines.iscoroutine(res):
                self._loop.create_task(res)

对于外发消息:

class QbpStreamWriter(streams.StreamWriter):
    def write_msg(self, msg):
        # data = serialise message
        self.write(data)

对于收到的消息:

class QbpStreamReader(streams.StreamReader):
    @asyncio.coroutine
    def read_msg(self):
        data = yield from self.readexactly(header_length)
        # msg_type, msg_length = unpack header
        data = yield from self.readexactly(msg_length)
        return build_message(msg_type, data)

希望能对某些人有所帮助


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接