Python asyncio子进程连续写入stdin并读取stdout/stderr

18

我当前正在使用Python3的asyncio模块执行subprocess任务。我的代码简单地向标准输入(stdin)写入内容,并同时读取标准输出(stdout)和标准错误(stderr):

import asyncio


async def read_stdout(stdout):
    print('read_stdout')
    while True:
        buf = await stdout.read(10)
        if not buf:
            break

        print(f'stdout: { buf }')


async def read_stderr(stderr):
    print('read_stderr')
    while True:
        buf = await stderr.read()
        if not buf:
            break

        print(f'stderr: { buf }')


async def write_stdin(stdin):
    print('write_stdin')
    for i in range(100):
        buf = f'line: { i }\n'.encode()
        print(f'stdin: { buf }')

        stdin.write(buf)
        await stdin.drain()
        await asyncio.sleep(0.5)


async def run():
    proc = await asyncio.create_subprocess_exec(
        '/usr/bin/tee',
        stdin=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

    await asyncio.gather(
        read_stderr(proc.stderr),
        read_stdout(proc.stdout),
        write_stdin(proc.stdin))


asyncio.run(run())

它的工作效果相当不错,但我在Python3文档页面上看到了一个警告:

警告 使用communicate()方法而不是process.stdin.write()await process.stdout.read()await process.stderr.read,以避免由于流暂停读取或写入而导致死锁并阻塞子进程。

这是否意味着以上代码会在某些情况下陷入死锁?如果是这样,如何在Python3 asyncio中连续编写stdin并读取stdout / stderr而不出现死锁?

非常感谢。


2
communicate 等待子进程终止。如果您期望多次读取(例如,读取一些内容,向 stdin 写入回复,再次读取等),则 communicate 无法使用。警告仅涉及一次性读取的简单情况... - Bakuriu
1
谢谢您的回复,那么反复读写不会有问题吗? - Nguyen Binh
1个回答

16

这个警告是从常规子进程模块中继承而来的,它警告我们不要尝试实现看似完美正确的简单通信,如下所示:

# write the request to the subprocess
await proc.stdin.write(request)
# read the response
response = await proc.stdout.readline()

如果子进程在读取整个请求之前开始写响应,则可能会导致死锁。如果响应足够大,子进程将阻塞,等待父进程读取其中一些内容并在管道缓冲区中腾出空间。然而,父进程无法这样做,因为它仍在编写响应并等待写入完成后才开始读取。因此,子进程等待父进程读取(部分)响应,而父进程则等待子进程完成接受请求。由于两者都在等待对方的当前操作完成,因此产生了死锁。
由于您的读取和写入是并行执行的,因此您的代码没有这个问题。由于读取器从不等待编写器,反之亦然,因此没有机会发生(那种)死锁。如果您查看实现的方式,您会发现,除了一些调试日志记录外,它的工作方式与您的代码非常相似。

1
非常感谢您提供如此详细的答案! - Nguyen Binh
如果Python文档不那么吓人就好了,因为我和提问者一样,由于同样的担心而来到这里,尽管我的代码实现逻辑与提问者相同,却没有任何问题。 - hl037_
@hl037_ 至少警告可以更加具体一些。它的可怕并非没有道理,因为使用一个看似合理的模式“我只是写一些给子进程然后等待它回应”的方式很容易出现死锁问题(不像你和原帖作者,不是每个人都会同时考虑这两个步骤)。更糟糕的是,死锁通常不会在测试时发生,当负载较小时,而是在之后的某个时间点,这对于调试来说并不愉快。 - user4815162342
是的,至少他们可以先警告风险,然后说“您可以使用communicate()来确保没有死锁”。 - hl037_

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接