Python异步IO:如何读取标准输入并写入标准输出?

11

我需要异步读取标准输入来获取消息(以\r\n为结尾的JSON格式),并在处理后异步写入更新的消息到标准输出流。

目前我是同步地进行操作,例如:

class SyncIOStdInOut():
    def write(self, payload: str):
        sys.stdout.write(payload)
        sys.stdout.write('\r\n')
        sys.stdout.flush()

    def read(self) -> str:
        payload=sys.stdin.readline()
        return  payload

如何以异步方式执行相同的操作?
2个回答

21
这是使用asyncio streams(适用于Unix)将标准输入stdin回显到标准输出stdout的示例。
import asyncio
import sys


async def connect_stdin_stdout():
    loop = asyncio.get_event_loop()
    reader = asyncio.StreamReader()
    protocol = asyncio.StreamReaderProtocol(reader)
    await loop.connect_read_pipe(lambda: protocol, sys.stdin)
    w_transport, w_protocol = await loop.connect_write_pipe(asyncio.streams.FlowControlMixin, sys.stdout)
    writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)
    return reader, writer


async def main():
    reader, writer = await connect_stdin_stdout()
    while True:
        res = await reader.read(100)
        if not res:
            break
        writer.write(res)
        await writer.drain()


if __name__ == "__main__":
    asyncio.run(main())

作为一个即插即用的解决方案,你可以使用aioconsole库。它实现了类似的方法,同时还提供了额外有用的异步等效函数来替代inputprintexeccode.interact
from aioconsole import get_standard_streams

async def main():
    reader, writer = await get_standard_streams()

更新:

让我们试着弄清楚函数connect_stdin_stdout的工作原理。

  1. 获取当前的事件循环:
loop = asyncio.get_event_loop()

创建一个StreamReader实例。
reader = asyncio.StreamReader()

通常情况下,StreamReader/StreamWriter类不打算直接实例化,而应该只作为诸如open_connection()start_server()等函数的结果使用。 StreamReader提供了对某个数据流的缓冲异步接口。一些源代码(库代码)调用它的函数,例如feed_datafeed_eof,数据被缓冲起来,并可以使用documented接口协程read()readline()等进行读取。
  1. 创建StreamReaderProtocol实例。
protocol = asyncio.StreamReaderProtocol(reader)

这个类继承自asyncio.ProtocolFlowControlMixin,用于在ProtocolStreamReader之间进行适配。它重写了Protocoldata_receivedeof_received等方法,并调用StreamReaderfeed_data方法。
  1. 在事件循环中注册标准输入流stdin
await loop.connect_read_pipe(lambda: protocol, sys.stdin)

connect_read_pipe函数的pipe参数是一个类似文件的对象。而stdin就是一个类似文件的对象。从现在开始,所有从stdin读取的数据都会进入StreamReaderProtocol,然后传递给StreamReader

  1. 在事件循环中注册标准输出流stdout
w_transport, w_protocol = await loop.connect_write_pipe(FlowControlMixin, sys.stdout)

在`connect_write_pipe`中,您需要传递一个协议工厂,该工厂创建实现`StreamWriter.drain()`流控逻辑的协议实例。这个逻辑是在`FlowControlMixin`类中实现的。同时,`StreamReaderProtocol`也继承自它。
6. 创建`StreamWriter`实例。
writer = asyncio.StreamWriter(w_transport, w_protocol, reader, loop)

这个类使用write()writelines()等函数将传递给它的数据转发到底层的transportprotocol用于支持drain()函数,以等待底层传输刷新其内部缓冲区并再次可写入。 reader是一个可选参数,可以是None,它也用于支持drain()函数,在该函数开始时检查读取器是否设置了异常,例如由于连接丢失(对套接字和双向连接相关),然后drain()也会抛出异常。
您可以在这个很棒的答案中了解更多关于StreamWriterdrain()函数的信息。

更新2:

要使用\r\n分隔符读取行,可以使用readuntil

你好,稍后我会尽量详细地描述。 - alex_noname
这是我到目前为止达到的最远程度:https://pastebin.com/2Rwch6Dr - user3225309
我稍微修改了这个函数并更新了答案。 - alex_noname
еңЁдёҚзӯүеҫ…writer.writeи°ғз”Ёзҡ„ең°ж–№жҳҜдёҚжҳҜжү“й”ҷдәҶпјҹ - oliora
不,这不是一个打字错误。但是在使用await drain()时使用write是正确的。我已经更新了答案。谢谢你的注意。 - alex_noname
显示剩余8条评论

2
这是另一种从标准输入异步读取的方法(每次读取一行)。
async def async_read_stdin()->str:
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, sys.stdin.readline)

1
谢谢,但我更喜欢@alex_noname建议的。 - user3225309
如果调用此任务的asycnio被取消,则在readline()上阻塞的后台线程仍将运行,并且由于https://dev59.com/P1UL5IYBdhLWcg3wqpk7#49992422,程序将不会退出。 - David Lechner
这是使用线程池进行阻塞调用吗?我认为这不是真正的异步。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接