子进程命令的实时输出

290
我使用一个Python脚本作为流体力学代码的驱动程序。在运行模拟时,我使用 subprocess.Popen 运行代码,并将 stdoutstderr 的输出收集到一个 subprocess.PIPE 中 --- 然后我可以打印(并保存到日志文件中)输出信息,并检查是否有任何错误。问题是,我不知道代码的进展情况。如果我直接从命令行运行它,它会给我关于迭代次数、时间、下一个时间步长等方面的输出信息。 是否有办法既存储输出(用于记录和错误检查),又产生实时输出流? 我的代码相关部分:
ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
    print "RUN failed\n\n%s\n\n" % (errors)
    success = False

if( errors ): log_file.write("\n\n%s\n\n" % errors)

最初我是通过teerun_command的输出流复制到日志文件中,并且该流仍然直接输出到终端,但是这种方式无法存储任何错误(据我所知)。


目前我的临时解决方案:

ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
    log_file.flush()

然后,在另一个终端中运行 tail -f log.txt (其中log_file = 'log.txt')。


1
也许你可以像之前的一个 Stack Overflow 问题中那样使用 Popen.poll。链接:https://dev59.com/bnA75IYBdhLWcg3w_ef1。 - Paulo Almeida
1
一些显示进度指示的命令(例如git)只有在它们的输出是“tty设备”(通过libc isatty()测试)时才会这样做。在这种情况下,您可能需要打开一个伪终端。 - torek
@torek 什么是(伪)tty? - DilithiumMatrix
2
在类Unix系统上,有一些设备可以让进程伪装成串行端口上的用户。例如,这就是ssh(服务器端)的工作原理。请参阅Python pty库pexpect - torek
如果进程成功退出,即返回值为0,则while not ret_val.poll()将创建一个无限循环。 - Viktor Kerkez
显示剩余3条评论
24个回答

242

Python 3的简要概述:

import subprocess
import sys

with open("test.log", "wb") as f:
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for c in iter(lambda: process.stdout.read(1), b""):
        sys.stdout.buffer.write(c)
        f.buffer.write(c)

你有两种方法可以做到这一点,一种是从 readreadline 函数创建一个迭代器并执行:

import subprocess
import sys

# replace "w" with "wb" for Python 3
with open("test.log", "w") as f:
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    # replace "" with b'' for Python 3
    for c in iter(lambda: process.stdout.read(1), ""):
        sys.stdout.write(c)
        f.write(c)
或者
import subprocess
import sys

# replace "w" with "wb" for Python 3
with open("test.log", "w") as f:
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    # replace "" with b"" for Python 3
    for line in iter(process.stdout.readline, ""):
        sys.stdout.write(line)
        f.write(line)

或者您可以创建一个reader和一个writer文件,将writer传递给Popen并从reader中读取。

import io
import time
import subprocess
import sys

filename = "test.log"
with io.open(filename, "wb") as writer, io.open(filename, "rb", 1) as reader:
    process = subprocess.Popen(command, stdout=writer)
    while process.poll() is None:
        sys.stdout.write(reader.read())
        time.sleep(0.5)
    # Read the remaining
    sys.stdout.write(reader.read())

这样你就可以在test.log文件中以及标准输出中获得数据。

文件方式的唯一优势是您的代码不会阻塞。因此,您可以在此期间进行任何操作并以非阻塞方式从reader中读取。当您使用PIPE时,readreadline函数将阻塞,直到管道中写入一个字符或写入一行为止。


3
写入文件,从中读取,并在循环中休眠?还有可能在你完成读取文件之前进程就结束了。 - Guy Sirton
18
在Python 3中,你需要使用iter(process.stdout.readline, b'')来迭代处理器的标准输出流(即传递给iter的哨兵需要是一个二进制字符串,因为b'' != '')。 - John Mellor
4
对于二进制流,请执行以下操作:for line in iter(process.stdout.readline, b''): sys.stdout.buffer.write(line) - rrlamichhane
11
在JohnMellor的回答上补充一点,Python 3中需要进行以下修改:process = subprocess.Popen(command, stderr=subprocess.STDOUT, stdout=subprocess.PIPE) for line in iter(process.stdout.readline, b''): sys.stdout.write(line.decode(sys.stdout.encoding))注:此处为代码示例,为使其更易读懂,无法翻译得完全精准,请以原始代码为准。 - bergercookie
12
但是输出并不是实时的,对吧?根据我的经验,它只会等待进程执行完毕后才会将内容打印到控制台。链接 -> https://dev59.com/KIrda4cB1Zd3GeqPMXLi - denis631
显示剩余12条评论

117

执行摘要(或“tl;dr”版本):如果最多只有一个subprocess.PIPE,那么很容易处理,否则就很难。

现在是时候解释一下subprocess.Popen如何工作了。

(警告:这适用于Python 2.x,尽管3.x类似;而且我对Windows变体非常模糊。我更好地理解POSIX的东西。)

Popen函数需要同时处理零到三个I/O流。这些通常被标记为stdinstdoutstderr

您可以提供:

  • None,表示您不想重定向流。它将像往常一样继承这些。请注意,在至少POSIX系统上,这并不意味着它将使用Python的sys.stdout,而只是Python的实际 stdout;请参见末尾的演示。
  • 一个int值。这是一个“原始”的文件描述符(至少在POSIX中)。 (副注:PIPESTDOUT实际上是内部int,但是是“不可能”的描述符-1和-2。)
  • 一个流 - 实际上是任何具有fileno方法的对象。 Popen将查找该流的描述符,使用stream.fileno(),然后像int值一样继续。
  • subprocess.PIPE,表示Python应创建一个管道。
  • subprocess.STDOUT(仅适用于stderr):告诉Python使用与stdout相同的描述符。这只在您为stdout提供了(非None)值,并且即使如此,它也只是需要如果您设置了stdout=subprocess.PIPE。(否则,您可以只提供与stdout提供的相同参数,例如,Popen(...,stdout = stream,stderr = stream)。)

最简单的情况(没有管道)

如果您不重定向任何内容(将所有三个保留为默认的None值或提供显式的None),则Pipe非常容易处理。它只需启动子进程并让其运行即可。或者,如果您重定向到非PIPE - 一个int或流的fileno(),那么它仍然很容易,因为操作系统会完成所有工作。Python只需启动子进程,将其stdin、stdout和/或stderr连接到提供的文件描述符即可。

还是比较简单的情况:一个管道

如果你只重定向一个流,Pipe 仍然非常容易处理。我们逐个流来看。

假设你想提供一些 stdin,但让 stdoutstderr 未被重定向,或者重定向到文件描述符。作为父进程,你的 Python 程序只需要使用 write() 将数据发送到管道中。你可以自己这样做,例如:

proc = subprocess.Popen(cmd, stdin=subprocess.PIPE)
proc.stdin.write('here, have some data\n') # etc

或者您可以将标准输入数据传递给proc.communicate(),然后执行上述的stdin.write。由于没有输出返回,因此communicate()只有另一个真正的任务:它还会为您关闭管道。(如果您不调用proc.communicate(),则必须调用proc.stdin.close()来关闭管道,以便子进程知道没有更多的数据通过。)
假设您想捕获stdout但保留stdinstderr。同样,这很容易:只需调用proc.stdout.read()(或等效函数),直到没有更多的输出为止。由于proc.stdout()是一个普通的Python I/O流,您可以在其上使用所有常规结构,例如:
for line in proc.stdout:

或者,你可以使用proc.communicate(),它会为你简单地执行read()
如果你只想捕获stderr,那么它的用法与stdout相同。
在事情变得困难之前,还有一个技巧。假设你想捕获stdout,并且也想捕获stderr,但是在与stdout相同的管道上:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

在这种情况下,subprocess “作弊”了!好吧,它必须这样做,所以它并不是真正的作弊:它启动子进程,并将其标准输出和标准错误输出都导向反馈到其父(Python)进程的(单个)管道描述符中。在父进程端,仍然只有一个用于读取输出的管道描述符。所有“stderr”输出都会显示在proc.stdout中,如果调用proc.communicate(),则stderr结果(元组中的第二个值)将为None,而不是字符串。
困难的案例:两个或更多的管道
问题都出现在您想要使用至少两个管道时。实际上,subprocess 代码本身就有这部分内容:
def communicate(self, input=None):
    ...
    # Optimization: If we are only using one pipe, or no pipe at
    # all, using select() or threads is unnecessary.
    if [self.stdin, self.stdout, self.stderr].count(None) >= 2:

但是,不幸的是,我们至少创建了两个,甚至三个不同的管道,因此count(None)返回1或0。我们必须采取更困难的方法。
在Windows上,这使用threading.Thread来累积self.stdoutself.stderr的结果,并且父线程传递self.stdin输入数据(然后关闭管道)。
在POSIX上,如果可用,这将使用poll,否则使用select来累积输出并传递stdin输入。所有这些都在(单个)父进程/线程中运行。
这里需要线程或轮询/选择以避免死锁。例如,假设我们已将所有三个流重定向到三个单独的管道。进一步假设在写入进程被暂停等待从另一端“清除”管道之前,可以将多少数据塞入管道中存在小限制。为了说明问题,让我们将该小限制设置为一个字节。 (事实上,这就是事情的工作方式,只是限制比一个字节大得多。)
如果父(Python)进程尝试写入几个字节-例如,'go\n'proc.stdin,第一个字节进入,然后第二个字节导致Python进程暂停,等待子进程读取第一个字节,从另一端清空管道。
同时,假设子进程决定打印友好的“Hello!不要惊慌!”问候语。 H进入其stdout管道,但e导致它暂停,等待其父进程读取该H,并清空stdout管道。
现在我们陷入了困境:Python进程正在睡眠中,等待完成“go”的说法,而子进程也在睡眠中,等待完成“Hello!不要惊慌!”的说法。 subprocess.Popen代码通过线程或选择/轮询避免了这个问题。当字节可以通过管道时,它们会通过。当不能时,只有一个线程(而不是整个进程)必须睡眠-或者,在选择/轮询的情况下,Python进程同时等待“可以写入”或“数据可用”,仅在有空间时将数据写入进程的stdin,并且仅在准备好数据时读取其stdout和/或stderr。 proc.communicate()代码(实际上是_communicate,其中处理棘手的情况)返回一旦所有stdin数据(如果有)已发送并且已累积所有stdout和/或stderr数据。
如果您想在两个不同的管道上读取stdoutstderr(无论是否重定向了任何stdin),您需要避免死锁。 此处的死锁场景不同-当子进程在您从stdout拉取数据时写入一些长字符到stderr,或反之亦然-但它仍然存在。

演示

我承诺演示未重定向的Python subprocess将写入底层stdout,而不是sys.stdout。 所以,这里有一些代码:

from cStringIO import StringIO
import os
import subprocess
import sys

def show1():
   print 'start show1'
   save = sys.stdout
   sys.stdout = StringIO()
   print 'sys.stdout being buffered'
   proc = subprocess.Popen(['echo', 'hello'])
   proc.wait()
   in_stdout = sys.stdout.getvalue()
   sys.stdout = save
   print 'in buffer:', in_stdout

def show2():
   print 'start show2'
   save = sys.stdout
   sys.stdout = open(os.devnull, 'w')
   print 'after redirect sys.stdout'
   proc = subprocess.Popen(['echo', 'hello'])
   proc.wait()
   sys.stdout = save

show1()
show2()

当运行时:

$ python out.py
start show1
hello
in buffer: sys.stdout being buffered

start show2
hello

请注意,如果添加 stdout=sys.stdout,第一个例程将失败,因为 StringIO 对象没有 fileno。如果添加 stdout=sys.stdout,第二个例程将省略 hello,因为 sys.stdout 已被重定向到 os.devnull
(如果您重定向 Python 的文件描述符 1,则子进程将遵循该重定向。调用 open(os.devnull, 'w') 会产生一个流,其 fileno() 大于 2。)

嗯,你的演示似乎显示了和结论相反的结果。你将Python的标准输出重定向到缓冲区,但是子进程的标准输出仍然会在控制台打印。这有什么用处吗?我错过了些什么吗? - Guy Sirton
@GuySirton :演示显示,当未明确指定到 sys.stdout 时,子进程的标准输出会流向 Python 的标准输出,而不是 Python 程序 的 (sys.) 标准输出。我承认这是一种奇怪的区别。有没有更好的表述方式呢? - torek
2
+1,解释得很好,但缺少具体的代码示例。这里有一个基于asyncio的代码实现“难点”(它可以同时处理多个管道)的可移植方式。您可以将其与使用多个线程(teed_call())执行相同操作的代码进行比较。 - jfs
1
@SamirAguiar:我不知道有什么好的简短摘要,但它很简单:在POSIX操作系统级别,“stdout”就是“文件描述符#1”。当您打开一个文件时,您会得到下一个可用的 fd,通常从3开始(因为0、1和2是stdin、stdout、stderr)。如果您将Python的sys.stdout设置为写入该fd-例如,从最近的open操作中的fd 5-然后进行fork和exec,则您执行的内容将写入其fd#1。除非您做出特殊安排,否则他们的fd1是您的fd1,这不再是您的sys.stdout。 - torek
1
@SamirAguiar 我认为如果你阅读sys模块的文档,它可能会有所帮助。我认为名称的选择不太幸运(令人困惑),但是例如sys.stdout基本上只是“print输出将去哪里”的表示。因此,当您重新分配它时,您实际上并没有重新绑定文件描述符1(这是子进程看到的内容)。sys.__stdout__是“Python的实际标准输出”。 - user98761
显示剩余5条评论

30

我们还可以使用默认的文件迭代器来读取stdout,而不是使用iter结构和readline()

import subprocess
import sys

process = subprocess.Popen(
    your_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
for line in process.stdout:
    sys.stdout.write(line)

3
这里最优雅的答案! - Nir
34
此解决方案不实时显示,它会等待进程完成并一次性显示所有输出。在 Viktor Kerkez 的解决方案中,如果“your_command”逐步显示,则输出也会逐步进行,只要“your_command”定期清空 stdout(由于管道)。 - Eric H.
3
由于它不是现场直播,所以需要这样做。 - melMass
1
这个解决方案迭代默认描述符,因此它只会在输出更新时进行更新。如果需要基于字符的更新,则需要像Viktor的解决方案中所示那样迭代read()方法。但对于我的用例来说,那太过复杂了。 - Jughead
1
非常实时,无需等待进程退出。非常感谢。 - zhukovgreen
显示剩余2条评论

21

除了所有这些答案之外,还有一种简单的方法可以如下:

process = subprocess.Popen(your_command, stdout=subprocess.PIPE)

while process.stdout.readable():
    line = process.stdout.readline()

    if not line:
        break

    print(line.strip())

只要可读流仍然可读并且获取的结果为空,请循环遍历该流,然后停止。

关键在于 readline() 会返回一行(以 \n 结尾),只要还有输出,如果已经到达末尾,则返回空。

希望对某些人有所帮助。


与其使用print(line.strip()),可能更好的是使用print(line, end="")(默认end="\n"),以防行的开头或结尾实际上包含空格。而且,为了防止这些行不以"\n"结尾,也许还可以加上flush=True - undefined

12
如果您能够使用第三方库,您可能可以使用像sarge这样的东西(声明:我是其维护者)。该库允许对子进程的输出流进行非阻塞访问,它是在subprocess模块之上构建的。

顺便说一句,sarge 做得很好。确实解决了 OP 的需求,但对于那种使用情况可能有点过于笨重了。 - deepelement
13
如果您正在建议一种工具,至少展示一个在此确切情况下使用该工具的示例。 - Serhiy

10
如果你只需要在控制台看到输出,那么对于我来说最简单的解决方案是将以下参数传递给Popen
with Popen(cmd, stdout=sys.stdout, stderr=sys.stderr) as proc:

将使用您的Python脚本标准输入输出文件句柄


7

解决方案1:同时实时记录stdoutstderr

一个简单的解决方案,可以将stdoutstderr同时实时按行记录到日志文件中。

import subprocess as sp
from concurrent.futures import ThreadPoolExecutor


def log_popen_pipe(p, stdfile):

    with open("mylog.txt", "w") as f:

        while p.poll() is None:
            f.write(stdfile.readline())
            f.flush()

        # Write the rest from the buffer
        f.write(stdfile.read())


with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    with ThreadPoolExecutor(2) as pool:
        r1 = pool.submit(log_popen_pipe, p, p.stdout)
        r2 = pool.submit(log_popen_pipe, p, p.stderr)
        r1.result()
        r2.result()

解决方案2:一个函数read_popen_pipes(),允许您实时并发迭代两个管道(stdout/stderr)。
import subprocess as sp
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor


def enqueue_output(file, queue):
    for line in iter(file.readline, ''):
        queue.put(line)
    file.close()


def read_popen_pipes(p):

    with ThreadPoolExecutor(2) as pool:
        q_stdout, q_stderr = Queue(), Queue()

        pool.submit(enqueue_output, p.stdout, q_stdout)
        pool.submit(enqueue_output, p.stderr, q_stderr)

        while True:

            if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
                break

            out_line = err_line = ''

            try:
                out_line = q_stdout.get_nowait()
                err_line = q_stderr.get_nowait()
            except Empty:
                pass

            yield (out_line, err_line)

# The function in use:

with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    for out_line, err_line in read_popen_pipes(p):
        print(out_line, end='')
        print(err_line, end='')

    p.poll()


感谢您使用“read_popen_pipes”。它非常好用,即使像我这样的Python线程新手也能轻松上手。提醒其他人:假设代码正在函数内运行,“return p.poll()”将会生效。如果要将其作为独立示例运行,请将“return p.poll()”替换为“sys.exit(p.poll())”,并将“my_cmd”替换为["ls"]或您想要运行的任何命令。 - DoomGoober
1
@DoomGoober 感谢你的赞美之词。我已经按照你的建议修复了代码。为了让示例尽可能简单,我没有包含 sys.exit - Rotareti

5

与之前的答案类似,但以下解决方案适用于我在Windows上使用Python3提供实时打印和记录的通用方法(来源)

def print_and_log(command, logFile):
    with open(logFile, 'wb') as f:
        command = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)

        while True:
            output = command.stdout.readline()
            if not output and command.poll() is not None:
                f.close()
                break
            if output:
                f.write(output)
                print(str(output.strip(), 'utf-8'), flush=True)
        return command.poll()

如果我也想在最后返回stdout,我需要修改什么? - Kvothe

3
一种好但比较笨重的解决方案是使用Twisted - 请参见底部。
如果您只想使用stdout,可以尝试以下内容:
import subprocess
import sys
popenobj = subprocess.Popen(["ls", "-Rl"], stdout=subprocess.PIPE)
while not popenobj.poll():
   stdoutdata = popenobj.stdout.readline()
   if stdoutdata:
      sys.stdout.write(stdoutdata)
   else:
      break
print "Return code", popenobj.returncode

如果你使用read()函数,它会试图读取整个“文件”,这并没有什么用处,我们真正需要的是一些能够读取当前管道中所有数据的东西。
也可以尝试使用线程来处理,例如:
import subprocess
import sys
import threading

popenobj = subprocess.Popen("ls", stdout=subprocess.PIPE, shell=True)

def stdoutprocess(o):
   while True:
      stdoutdata = o.stdout.readline()
      if stdoutdata:
         sys.stdout.write(stdoutdata)
      else:
         break

t = threading.Thread(target=stdoutprocess, args=(popenobj,))
t.start()
popenobj.wait()
t.join()
print "Return code", popenobj.returncode

现在我们可以通过使用两个线程来添加stderr。但是,请注意,子进程文档不建议直接使用这些文件,并建议使用`communicate()`(主要涉及死锁问题,我认为以上不是问题),而且解决方案有点笨拙,因此似乎**子进程模块不能完全胜任此工作**(另请参见:http://www.python.org/dev/peps/pep-3145/),我们需要寻找其他解决方案。一种更复杂的解决方案是使用Twisted,如下所示:https://twistedmatrix.com/documents/11.1.0/core/howto/process.html
使用Twisted的方法是使用reactor.spawnprocess()创建进程,并提供一个ProcessProtocol以异步处理输出。 这里是Twisted示例Python代码:https://twistedmatrix.com/documents/11.1.0/core/howto/listings/process/process.py

谢谢!我刚刚尝试了类似这样的东西(基于@PauloAlmeida的评论),但我的subprocess.Popen调用是阻塞的——也就是说,只有在它返回后才进入while循环... - DilithiumMatrix
1
这不是正在发生的事情。它立即进入while循环,然后在read()调用上阻塞,直到子进程退出并且父进程在管道上收到EOF - Alp
@Alp有趣!就是这样。 - DilithiumMatrix
是的,我太心急了发了这个帖子。实际上它无法正常工作,也不能轻松修复。回到起点重新设计吧。 - Guy Sirton
@GuySirton 哈哈,没问题 - 我很感激你的尝试!这很奇怪 - 因为同样的内容被发布为一个(受欢迎的)“答案”,回答了https://dev59.com/bnA75IYBdhLWcg3w_ef1的问题。 - DilithiumMatrix
1
@zhermes:read() 的问题在于它会尝试读取整个输出直到 EOF,这并不实用。readline() 可以帮助解决问题,也许这就是你所需要的(但是过长的行也可能会成为问题)。你还需要注意启动进程时的缓冲区。 - Guy Sirton

3

我找到了一个简单的解决方案来解决一个复杂的问题。

  1. 需要流式传输stdout和stderr。
  2. 两者都需要是非阻塞的:当没有输出时以及当有太多输出时。
  3. 不想使用线程或多进程,也不想使用pexpect。

此解决方案使用我在这里找到的gist:这里

import subprocess as sbp
import fcntl
import os

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.readline()
    except:
        return ""

with sbp.Popen('find / -name fdsfjdlsjf',
                shell=True,
                universal_newlines=True,
                encoding='utf-8',
                bufsize=1,
                stdout=sbp.PIPE,
                stderr=sbp.PIPE) as p:
    while True:
        out = non_block_read(p.stdout)
        err = non_block_read(p.stderr)
        if out:
            print(out, end='')
        if err:
            print('E: ' + err, end='')
        if p.poll() is not None:
            break

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接