如何在Python中编写一个通用方法来执行多个管道连接的shell命令?

3

我有很多需要在Python脚本中执行的Shell命令。我知道不应该像这里所提到的那样使用shell=true,而是可以像这里所提到的那样,在命令中使用标准输出和输入。

但问题是我的Shell命令非常复杂,充满了管道操作,因此我想编写一个通用的方法供我的脚本使用。

我下面做了一个小测试,但在打印结果后就一直挂起(我简化了代码才放在这里)。请问有人能告诉我:

  1. 为什么会挂起。
  2. 是否有更好的方法来实现这个功能。

谢谢。

PS:这只是一个大型Python项目的一小部分,我之所以尝试这样做,是因为有业务需求。感谢。

#!/usr/bin/env python3
import subprocess as sub
from subprocess import Popen, PIPE
import shlex

def exec_cmd(cmd,p=None,isFirstLoop=True):
   if not isFirstLoop and not p:
       print("Error, p is null")
       exit()
   if "|" in cmd:
       cmds = cmd.split("|")
       while "|" in cmd:
           # separates what is before and what is after the first pipe
           now_cmd = cmd.split('|',1)[0].strip()
           next_cmd = cmd.split('|',1)[-1].strip()
           try:
               if isFirstLoop:
                   p1 = sub.Popen(shlex.split(now_cmd), stdout=PIPE)
                   exec_cmd(next_cmd,p1,False)
               else:
                   p2 = sub.Popen(shlex.split(now_cmd),stdin=p.stdout, stdout=PIPE)
                   exec_cmd(next_cmd,p2,False)
           except Exception as e:
               print("Error executing command '{0}'.\nOutput:\n:{1}".format(cmd,str(e)))
               exit()
           # Adjust cmd to execute the next part
           cmd = next_cmd
   else:
       proc = sub.Popen(shlex.split(cmd),stdin=p.stdout, stdout=PIPE, universal_newlines=True)
       (out,err) = proc.communicate()
       if err:
           print(str(err).strip())
       else:
           print(out)



exec_cmd("ls -ltrh | awk '{print $9}' | wc -l ")

你可以尝试在代码审查的堆栈交换网站上发布这个问题,或许会有一些好运气。我认为,命令解析的方式存在一些潜在的问题,非常类似于使用shell=True时的问题... 这段代码并没有好到哪里去。其次,如果你使用stdout=PIPE创建一个管道,然后传递stdin=proc.stdout来使用同一个管道,这是连接程序的更好方法。但这需要一些思考。 - Dietrich Epp
好的,让我尝试这两个建议。谢谢。 - Arthur Accioly
@DietrichEpp 代码审查既不接受不能工作的代码,也不接受为了展示特定行为而简化的代码。这个问题有一个清晰的问题(代码挂起),可以在SO上得到解决。 - 301_Moved_Permanently
1
这是一个简单的方法:https://gist.github.com/depp/c798381d6810657f528fe8d0d8013729 - Dietrich Epp
@DietrichEpp,我认为您的答案也应该添加到下面的答案中。谢谢。 - Arthur Accioly
@ArthurAccioly:我不想发布一个我不支持的代码答案。这只是一条评论,因为还有一些边缘情况需要考虑。 - Dietrich Epp
2个回答

3
很不幸,这里有一些边缘情况需要Shell替你处理,或者,完全忽略掉。一些要注意的事情如下:
  • 函数应始终等待每个进程完成wait(),否则会出现所谓的僵尸进程

  • 命令应该使用真正的管道连接起来,这样就不需要一次性将整个输出读入内存。这是管道的正常工作方式。

  • 在父进程中应该关闭每个管道的读端,以便子进程可以在下一个进程关闭其输入时正确地发出SIGPIPE信号。如果没有这样做,父进程可能会保持管道处于打开状态,而子进程不知道退出,它可能会一直运行。

  • 子进程中的错误应该被抛出异常,除了SIGPIPE。对于最后一个进程上的SIGPIPE,读者需要自己决定是否抛出异常,因为SIGPIPE在那里并不会被预料到,但忽略它并不会造成伤害。

请注意,subprocess.DEVNULL在Python 3.3之前不存在。我知道你们中有一些人仍在使用2.x,你们需要手动打开一个文件来获得/dev/null,或者只是决定管道中的第一个进程与父进程共享stdin

以下是代码:

import signal
import subprocess

def run_pipe(*cmds):
    """Run a pipe that chains several commands together."""
    pipe = subprocess.DEVNULL
    procs = []
    try:
        for cmd in cmds:
            proc = subprocess.Popen(cmd, stdin=pipe,
                                    stdout=subprocess.PIPE)
            procs.append(proc)
            if pipe is not subprocess.DEVNULL:
                pipe.close()
            pipe = proc.stdout
        stdout, _ = proc.communicate()
    finally:
        # Must call wait() on every process, otherwise you get
        # zombies.
        for proc in procs:
            proc.wait()
    # Fail if any command in the pipe failed, except due to SIGPIPE
    # which is expected.
    for proc in procs:
        if (proc.returncode
            and proc.returncode != -signal.SIGPIPE):
            raise subprocess.CalledProcessError(
                proc.returncode, proc.args)
    return stdout

在这里,我们可以看到它的实际应用。您可以看到管道正确地以yes(运行直到SIGPIPE)终止,并且以false(始终失败)正确地失败。

In [1]: run_pipe(["yes"], ["head", "-n", "1"])
Out[1]: b'y\n'

In [2]: run_pipe(["false"], ["true"])
---------------------------------------------------------------------------
CalledProcessError                        Traceback (most recent call last)
<ipython-input-2-db97c6876cd7> in <module>()
----> 1 run_pipe(["false"], ["true"])

~/test.py in run_pipe(*cmds)
     22     for proc in procs:
     23         if proc.returncode and proc.returncode != -signal.SIGPIPE:
---> 24             raise subprocess.CalledProcessError(proc.returncode, proc.args)
     25     return stdout

CalledProcessError: Command '['false']' returned non-zero exit status 1

很好,我喜欢你强调了所有涉及的风险。谢谢。 - Arthur Accioly

3

不要使用shell字符串并尝试用自己的方法解析它,而是要求用户自己提供命令作为单独的实体。这样可以避免检测到作为shell管道的一部分而不是使用的|的明显陷阱。您可以选择让他们提供命令作为字符串列表或单个字符串,之后您将使用shlex.split进行拆分,这取决于您想要公开的接口。在下面的示例中,我会选择第一个选项,因为它更简单。

一旦您有了单独的命令,一个简单的for循环就足以将前一个命令的输出导入到下一个命令的输入中,正如您自己发现的那样

def pipe_subprocesses(*commands):
    if not commands:
        return

    next_input = None
    for command in commands:
        p = subprocess.Popen(command, stdin=next_input, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        next_input = p.stdout

    out, err = p.communicate()
    if err:
        print(err.decode().strip())
    else:
        print(out.decode())

使用示例:

>>> pipe_subprocesses(['ls', '-lhtr'], ['awk', '{print $9}'], ['wc', '-l'])
25

现在,这是一种快速而粗略的方法来设置它,并且似乎按照您想要的方式工作。但是这段代码至少存在两个问题:

  1. 您会泄漏僵尸进程/打开的进程句柄,因为只收集了最后一个进程的退出代码;并且操作系统仍然保持资源处于打开状态,以便您能够执行此操作;
  2. 如果某个进程在中途失败,则无法访问该进程的信息。

为了避免这种情况,您需要维护一个已打开的进程列表,并显式地 wait(等待)每个进程。因为我不知道您的确切用例,所以我将返回第一个失败的进程(如果有的话)或最后一个进程(如果没有),以便您可以相应地采取行动:

def pipe_subprocesses(*commands):
    if not commands:
        return

    processes = []
    next_input = None
    for command in commands:
        if isinstance(command, str):
            command = shlex.split(command)
        p = subprocess.Popen(command, stdin=next_input, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        next_input = p.stdout
        processes.append(p)

    for p in processes:
        p.wait()

    for p in processes:
        if p.returncode != 0:
            return p
    return p  # return the last process in case everything went well

我还举了一个shlex的例子,以便您可以混合使用原始字符串和已解析的列表:

>>> pipe_subprocesses('ls -lhtr', ['awk', '{print $9}'], 'wc -l')
25

1
这样做会丢失错误(忽略所有进程的返回代码),可能会泄漏进程(不等待所有子进程),如果您不打算向第一个进程写入任何内容,则第一个进程的输入不应该是管道。 - Dietrich Epp
我发布的代码只是一个"这就是如何做到这一点"的简单示例,没有提供真正的解释或上下文。我认为它并不是 Stack Overflow 上优质的回答。实际上,在实现这个功能时存在一些微妙的问题,由于时间紧迫,我无法对其进行详细说明,因此我发布了一个要点摘要。 - Dietrich Epp
是的,我在评论中提到了分割不正确,这就是为什么该函数被拆分成两个部分的原因。然而,对数组procs进行预分配对我来说有点荒谬,因为你正在为了可能无法衡量的性能收益而使代码变得更加复杂和难以理解--每次通过循环都必须进行fork/exec,这比append昂贵得多。 - Dietrich Epp

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接