我发现子类化asyncio.streams库的类相对比较简单。
start_server函数从tcp server示例中提取:
@asyncio.coroutine
def start_server(self, loop):
def factory():
reader = QbpStreamReader()
return QbpStreamReaderProtocol(reader, self._accept_client)
logger.info("QbpServer starting at tcp://%s:%s", self.host, self.port)
self.server = yield from loop.create_server(factory, self.host, self.port)
为了构建自己的StreamWriter,需要对StreamReaderProtocol进行子类化。除此之外,这与库函数相同。
class QbpStreamReaderProtocol(streams.StreamReaderProtocol):
def connection_made(self, transport):
self._stream_reader.set_transport(transport)
if self._client_connected_cb is not None:
self._stream_writer = QbpStreamWriter(transport, self,
self._stream_reader,
self._loop)
res = self._client_connected_cb(self._stream_reader,
self._stream_writer)
if coroutines.iscoroutine(res):
self._loop.create_task(res)
对于外发消息:
class QbpStreamWriter(streams.StreamWriter):
def write_msg(self, msg):
self.write(data)
对于收到的消息:
class QbpStreamReader(streams.StreamReader):
@asyncio.coroutine
def read_msg(self):
data = yield from self.readexactly(header_length)
data = yield from self.readexactly(msg_length)
return build_message(msg_type, data)
希望能对某些人有所帮助