Subprocess.Popen:将stdout和stderr同时克隆到终端和变量中

28

是否可以修改下面的代码,使得以下内容能够被打印到“stdout”和“stderr”中:

  • 实时地在终端上打印出来,
  • 最终存储在outserrs变量中?

代码如下:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import subprocess

def run_cmd(command, cwd=None):
    p = subprocess.Popen(command, cwd=cwd, shell=False,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)
    outs, errs = p.communicate()
    rc = p.returncode
    outs = outs.decode('utf-8')
    errs = errs.decode('utf-8')

    return (rc, (outs, errs))

感谢@unutbu,特别感谢@j-f-sebastian,最终函数如下:
#!/usr/bin/python3
# -*- coding: utf-8 -*-


import sys
from queue import Queue
from subprocess import PIPE, Popen
from threading import Thread


def read_output(pipe, funcs):
    for line in iter(pipe.readline, b''):
        for func in funcs:
            func(line.decode('utf-8'))
    pipe.close()


def write_output(get):
    for line in iter(get, None):
        sys.stdout.write(line)


def run_cmd(command, cwd=None, passthrough=True):
    outs, errs = None, None

    proc = Popen(
        command,
        cwd=cwd,
        shell=False,
        close_fds=True,
        stdout=PIPE,
        stderr=PIPE,
        bufsize=1
        )

    if passthrough:

        outs, errs = [], []

        q = Queue()

        stdout_thread = Thread(
            target=read_output, args=(proc.stdout, [q.put, outs.append])
            )

        stderr_thread = Thread(
            target=read_output, args=(proc.stderr, [q.put, errs.append])
            )

        writer_thread = Thread(
            target=write_output, args=(q.get,)
            )

        for t in (stdout_thread, stderr_thread, writer_thread):
            t.daemon = True
            t.start()

        proc.wait()

        for t in (stdout_thread, stderr_thread):
            t.join()

        q.put(None)

        outs = ' '.join(outs)
        errs = ' '.join(errs)

    else:

        outs, errs = proc.communicate()
        outs = '' if outs == None else outs.decode('utf-8')
        errs = '' if errs == None else errs.decode('utf-8')

    rc = proc.returncode

    return (rc, (outs, errs))

代码示例会存储 outserrs 并将它们返回... 要打印到终端,请简单地使用 if outs: print outsif errs: print errs - bnlucas
2
@bnlucas 谢谢,但正如我在第一点中所述:输出应该实时打印到终端,就像没有使用管道一样。 - Łukasz Zdun
2
如果你需要 Python 3 代码;请添加 [tag:python-3.x] 标签(我看到 shebang 中使用了 python3)。你的代码会导致读取线程无法结束。在 Python 3 中 '' 是 Unicode 字面量,但是 pipe.readline() 默认返回字节类型(在 Python 3 上 '' != b"")。如果你修复了这个问题,写入线程就不会结束,因为没有任何东西将 "" 加入队列中。 - jfs
4个回答

26

要在单个线程中异步捕获并显示子进程的标准输出(stdout)和标准错误(stderr),您可以使用异步I/O:

#!/usr/bin/env python3
import asyncio
import os
import sys
from asyncio.subprocess import PIPE

@asyncio.coroutine
def read_stream_and_display(stream, display):
    """Read from stream line by line until EOF, display, and capture the lines.

    """
    output = []
    while True:
        line = yield from stream.readline()
        if not line:
            break
        output.append(line)
        display(line) # assume it doesn't block
    return b''.join(output)

@asyncio.coroutine
def read_and_display(*cmd):
    """Capture cmd's stdout, stderr while displaying them as they arrive
    (line by line).

    """
    # start process
    process = yield from asyncio.create_subprocess_exec(*cmd,
            stdout=PIPE, stderr=PIPE)

    # read child's stdout/stderr concurrently (capture and display)
    try:
        stdout, stderr = yield from asyncio.gather(
            read_stream_and_display(process.stdout, sys.stdout.buffer.write),
            read_stream_and_display(process.stderr, sys.stderr.buffer.write))
    except Exception:
        process.kill()
        raise
    finally:
        # wait for the process to exit
        rc = yield from process.wait()
    return rc, stdout, stderr

# run the event loop
if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
rc, *output = loop.run_until_complete(read_and_display(*cmd))
loop.close()

这段代码看起来不错,你能否添加一个 Python 2.7 版本? - kinORnirvana
1
@kinORnirvana:asyncio仅适用于Python 3.3及以上版本。有一个名为trollius的Python 2克隆版,但是已被弃用 - jfs
1
请注意,一旦循环被关闭,执行 get_event_loop 将会得到同一个已关闭的循环,无法直接重复使用(会出现“event loop is closed”消息)。我最终采用了 asyncio.set_event_loop(asyncio.new_event_loop()) 来获取一个新的事件循环。 - Adversus
2
我在Jupyter Notebook里运行了这段代码。由于sys.stdout.buffer不再存在,导致出现了AttributeError错误。参考了这个文档:https://docs.python.org/3/library/sys.html#sys.stderr 后解决了问题。在Jupyter Notebook中使用了sys.stdout.write代替sys.stdout.buffer.write,输出显示在日志记录输出窗口中。 - dmmfll
1
我发现这会显示所有来自stderr的输出,然后是所有来自stdout的输出。 - Dan Hook
显示剩余2条评论

19
你可以生成线程来读取stdout和stderr管道,将内容写入一个公共队列,并将其添加到列表中。然后使用第三个线程从队列中打印条目。
import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE


def read_output(pipe, funcs):
    for line in iter(pipe.readline, ''):
        for func in funcs:
            func(line)
            # time.sleep(1)
    pipe.close()

def write_output(get):
    for line in iter(get, None):
        sys.stdout.write(line)

process = subprocess.Popen(
    ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True, bufsize=1)
q = Queue.Queue()
out, err = [], []
tout = threading.Thread(
    target=read_output, args=(process.stdout, [q.put, out.append]))
terr = threading.Thread(
    target=read_output, args=(process.stderr, [q.put, err.append]))
twrite = threading.Thread(target=write_output, args=(q.get,))
for t in (tout, terr, twrite):
    t.daemon = True
    t.start()
process.wait()
for t in (tout, terr):
    t.join()
q.put(None)
print(out)
print(err)

使用第三个线程的原因是为了防止两个打印语句同时发生,这可能导致文本错误。而不是让前两个线程直接打印到终端。上述代码调用random_print.py,在随机时间向stdout和stderr打印内容:
import sys
import time
import random

for i in range(50):
    f = random.choice([sys.stdout,sys.stderr])
    f.write(str(i)+'\n')
    f.flush()
    time.sleep(0.1)

这个解决方案借鉴了 J. F. Sebastian 在 这里 的代码和思路。


下面是适用于类 Unix 系统的另一种方案,使用 select.select

import collections
import select
import fcntl
import os
import time
import Queue
import sys
import threading
import subprocess
PIPE = subprocess.PIPE

def make_async(fd):
    # https://dev59.com/dVzUa4cB1Zd3GeqP0B84#7730201
    '''add the O_NONBLOCK flag to a file descriptor'''
    fcntl.fcntl(
        fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)

def read_async(fd):
    # https://dev59.com/dVzUa4cB1Zd3GeqP0B84#7730201
    '''read some data from a file descriptor, ignoring EAGAIN errors'''
    # time.sleep(1)
    try:
        return fd.read()
    except IOError, e:
        if e.errno != errno.EAGAIN:
            raise e
        else:
            return ''

def write_output(fds, outmap):
    for fd in fds:
        line = read_async(fd)
        sys.stdout.write(line)
        outmap[fd.fileno()].append(line)

process = subprocess.Popen(
    ['random_print.py'], stdout=PIPE, stderr=PIPE, close_fds=True)

make_async(process.stdout)
make_async(process.stderr)
outmap = collections.defaultdict(list)
while True:
    rlist, wlist, xlist = select.select([process.stdout, process.stderr], [], [])
    write_output(rlist, outmap)
    if process.poll() is not None:
        write_output([process.stdout, process.stderr], outmap)
        break

fileno = {'stdout': process.stdout.fileno(),
          'stderr': process.stderr.fileno()}

print(outmap[fileno['stdout']])
print(outmap[fileno['stderr']])

这个解决方案使用了Adam Rosenfield的帖子中的代码和思路


process.wait() 之后,您可以添加 q.put(None) 并在 None 上退出第3个线程,例如 for line in iter(get, None):。另外,缺少 pipe.close() - jfs
@J.F.Sebastian:感谢您的纠正。假设由于某种原因read_output没有跟上写入到pipe中的输出。(我试图通过在上面加一个time.sleep(1)来模拟这种情况)。当取消注释time.sleep(1)时,outerr无法在process.wait()完成之前收集所有输出。您知道保证outerr获取所有输出的方法吗? - unutbu
put(None)之前使用t{err,out}.join()。顺便说一句,为了实时获取行,bufsize=1可能会有所帮助(忽略块缓冲问题)。 - jfs

0

为了将子进程的实时输出(stdout和stderr)同时传输到终端和变量中,您可以生成两个线程来并发处理这些流。

改编自我更详细的答案

import logging
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen


def stream_command(
    args,
    *,
    stdout_handler=logging.info,
    stderr_handler=logging.error,
    check=True,
    text=True,
    stdout=PIPE,
    stderr=PIPE,
    **kwargs,
):
    """Mimic subprocess.run, while processing the command output in real time."""
    with Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process:
        with ThreadPoolExecutor(2) as pool:  # two threads to handle the streams
            exhaust = partial(pool.submit, partial(deque, maxlen=0))
            exhaust(stdout_handler(line[:-1]) for line in process.stdout)
            exhaust(stderr_handler(line[:-1]) for line in process.stderr)
    retcode = process.poll()
    if check and retcode:
        raise CalledProcessError(retcode, process.args)
    return CompletedProcess(process.args, retcode)

使用自定义处理程序进行调用:

outs, errs = [], []
def stdout_handler(line):
    outs.append(line)
    print(line)
def stderr_handler(line):
    errs.append(line)
    print(line)

stream_command(
    ["echo", "test"],
    stdout_handler=stdout_handler,
    stderr_handler=stderr_handler,
)
# test
print(outs)
# ['test']

0
以下是您的`run_cmd`函数在Python 3.11中使用`asyncio`的另一种版本:
import asyncio
import io
import sys
from subprocess import SubprocessError


# Maximum number of bytes to read at once from the 'asyncio.subprocess.PIPE'
_MAX_BUFFER_CHUNK_SIZE = 1024

async def run_cmd_async(command, cwd=None, check=False):
    stdout_buffer = io.BytesIO()
    stderr_buffer = io.BytesIO()
    process = await asyncio.subprocess.create_subprocess_exec(
        *command,
        cwd=cwd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE)

    async def write_stdout() -> None:
        assert process.stdout is not None
        while chunk := await process.stdout.read(_MAX_BUFFER_CHUNK_SIZE):
            stdout_buffer.write(chunk)
            print(chunk.decode(), end="", flush=True)

    async def write_stderr() -> None:
        assert process.stderr is not None
        while chunk := await process.stderr.read(_MAX_BUFFER_CHUNK_SIZE):
            stderr_buffer.write(chunk)
            print(chunk.decode(), file=sys.stderr, end="", flush=True)

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(write_stdout())
        task_group.create_task(write_stderr())

        exit_code = await process.wait()
        if check and exit_code != 0:
            raise SubprocessError(
                f"Command '{command}' returned non-zero exit status {exit_code}."
            )
    return exit_code, (stdout_buffer.getvalue().decode(),
                       stderr_buffer.getvalue().decode())


def run_cmd(command, cwd=None, check=False):
    return asyncio.run(run_cmd_async(command, cwd=cwd, check=check))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接