如何在Python中将输出多路复用到操作系统文件描述符?

4
subprocess.Popen机制使用底层文件描述符而不是文件对象来写入其stdout/stderr。我需要同时捕获stdoutstderr,同时仍然将它们显示在控制台上。
我该如何创建一个文件描述符,使Popen可以使用它来实现这一点?

使用 subprocess.Popen 捕获输出,然后将其打印回您需要的所有文件描述符。 - jojo
不行,我不能将实际输出缓冲到stderr/stdout流中;用户可能正在与进程交互。 - Chris R
回答了另一个类似的问题,你可以根据我的回答进行修改,以便能够逐步读取stdout/stderr并将它们分别打印到sys.stdoutsys.stderr - samplebias
1个回答

3

这里需要一些背景信息: subprocess 使用您指定的 stdinstdoutstderr 对象的原始文件描述符,因为 它将它们传递给 POSIX。如果使用 subprocess.PIPE,则会使用 os.pipe() 创建一个新的管道。此外,Popen.communicate 会读取直到流的结尾,但如果您想将数据传输到其他地方,则可能不希望这样做。

由于您希望将输出打印到 stdout,我假设它是文本输出。您需要在 Popen 中使用 encodingerrorsuniversal_newlinessubprocess 将文件视为文本(请参阅文档)。

import subprocess

p = subprocess.Popen(
    '/usr/bin/whoami',
    stdout=subprocess.PIPE,  # Control stdout
    universal_newlines=True  # Files opened in text mode
)

# Pipe the data somewhere else too, e.g.: a log file
with open('subprocess.log', 'w') as logfile:
    # p.poll() returns the return code when `p` exits
    while p.poll() is None:
        line = p.stdout.readline()
        # one to our stdout (readline includes the \n)
        print(line, end='')
        # one to the logfile
        logfile.write(line)

同样的技术也可以用于操作 stderr,例如,通过将 file=sys.stderr 传递给 print。请注意,您还可以直接传递自己的 stdin 进行管道处理:

subprocess.Popen('/usr/bin/whoami', stdin=sys.stdin, stdout=subprocess.PIPE, ...)

毕竟,标准流只是包装文件描述符。如果读取到行尾不适合您所期望的输出类型,您可以只read一个非常短的缓冲区。
同时使用stderrstdout 如果需要同时使用stdoutstderr,则会遇到一次只能从一个流中读取的问题。
一种可能的方法是使用os.set_blocking使管道非阻塞,这样任何read方法都会立即返回,如果没有数据。这允许您在流之间交替使用。
另一种可能性是有两个单独的线程处理stdoutstderr;但是,通过aysncio模块有更简单的方法来实现此目的:
import asyncio
import sys

PROCESS_PATH = '/bin/mixed_output'

class MultiplexProtocol(asyncio.SubprocessProtocol):
    def __init__(self, exit_future):
        self.exit_future = exit_future

    def pipe_data_received(self, fd, data):
        if fd == sys.stdout.fileno():
            print(data.decode('utf-8'), file=sys.stdout, end='')
        elif fd == sys.stderr.fileno():
            print(data.decode('utf-8'), file=sys.stderr, end='')

    def process_exited(self):
        self.exit_future.set_result(True)


async def launch_subprocess(loop):
    # Future marking the end of the process
    exit_future = asyncio.Future(loop=loop)
    # Use asyncio's subprocess
    create_subp = loop.subprocess_exec(
        lambda: MultiplexProtocol(exit_future),
        PROCESS_PATH,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
        stdin=None
    )
    transport, protocol = await create_subp
    await exit_future
    # Close the pipes
    transport.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(launch_subprocess(loop))

与在主进程中不断循环以将数据传输到其他流相比,这要少得多消耗CPU资源,因为只有在需要时才会调用MultiplexProtocol.pipe_data_received


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接