同时监控子进程的标准输出和标准错误输出

11

我如何同时查看长时间运行的子进程的标准输出和标准错误,并在子进程生成每一行时立即处理每一行?

我不介意使用Python3.6的异步工具来创建预期为非阻塞的异步循环,但这似乎并不能解决问题。下面的代码:

import asyncio
from asyncio.subprocess import PIPE
from datetime import datetime


async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for f in p.stdout:
        print(datetime.now(), f.decode().strip())
    async for f in p.stderr:
        print(datetime.now(), "E:", f.decode().strip())

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run('''
         echo "Out 1";
         sleep 1;
         echo "Err 1" >&2;
         sleep 1;
         echo "Out 2"
    '''))
    loop.close()

输出:

2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:37.770187 Out 2
2018-06-18 00:06:37.770882 E: Err 1

虽然我期望它会输出类似这样的内容:

2018-06-18 00:06:35.766948 Out 1
2018-06-18 00:06:36.770882 E: Err 1
2018-06-18 00:06:37.770187 Out 2
3个回答

15
为了实现这一目标,您需要一个函数来接受两个异步序列并将它们合并在一起,以便在其中任意一个序列产生结果时都可以得到结果。如果有这样的函数可用,那么run可能如下所示:
async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for f in merge(p.stdout, p.stderr):
        print(datetime.now(), f.decode().strip())

标准库中目前(还)不存在像merge这样的函数,但是aiostream外部库提供了一个。你也可以使用异步生成器和asyncio.wait()编写自己的merge函数:

async def merge(*iterables):
    iter_next = {it.__aiter__(): None for it in iterables}
    while iter_next:
        for it, it_next in iter_next.items():
            if it_next is None:
                fut = asyncio.ensure_future(it.__anext__())
                fut._orig_iter = it
                iter_next[it] = fut
        done, _ = await asyncio.wait(iter_next.values(),
                                     return_when=asyncio.FIRST_COMPLETED)
        for fut in done:
            iter_next[fut._orig_iter] = None
            try:
                ret = fut.result()
            except StopAsyncIteration:
                del iter_next[fut._orig_iter]
                continue
            yield ret
上述run与您期望的输出仍然有一个细节上的区别: 它不能区分输出和错误行。但这可以通过在这些行上添加指示符来轻松实现:
async def decorate_with(it, prefix):
    async for item in it:
        yield prefix, item

async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    async for is_out, line in merge(decorate_with(p.stdout, True),
                                    decorate_with(p.stderr, False)):
        if is_out:
            print(datetime.now(), line.decode().strip())
        else:
            print(datetime.now(), "E:", line.decode().strip())

1
解决了问题,谢谢!我没想到他们需要第三方库,我以为标准库里有工具可以做到这一点。还要注意的是,我必须将循环包装在“async with”语句中,否则我会收到警告:“'AsyncIteratorContext'在其上下文之外被迭代”。 - Aleph Aleph
@AlephAleph 我遇到了类似的问题,但我不确定应该如何精确地包装合并。你能详细说明一下吗?谢谢! - Liviu

6

我想到了一个更简单的解决方案,至少在观察代码不需要在单个协程调用的情况下是这样。

您可以生成两个单独的协程,一个用于stdout,另一个用于stderr。并行运行它们将为您提供所需的语义,您可以使用gather等待它们的完成:

def watch(stream, prefix=''):
    async for line in stream:
        print(datetime.now(), prefix, line.decode().strip())

async def run(cmd):
    p = await asyncio.create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
    await asyncio.gather(watch(p.stdout), watch(p.stderr, 'E:'))

0

这里有一个没有任何外部依赖的示例:

def h_out(s):
    print(f"[O] {s}")

def h_err(s): 
    print(f"[E] {s}")

async def _rs(stream, cb, enc):  
    while True:
        line = await stream.readline()
        if line:
            line = line.decode(enc)
            cb(line.rstrip())
        else:
            break

cmd = ['tail', '-f', '/var/log/syslog']
enc = 'utf-8'
p = await asyncio.create_subprocess_exec(*cmd
                        , stdout=PIPE, stderr=PIPE)

await asyncio.wait([_rs(p.stdout, h_out, enc)
                    ,_rs(p.stderr, h_err, enc)])

await p.wait()

这段代码的完整示例适用于Windows,Linux,BSD等操作系统:github.com/JavaScriptDude/PyTail

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接