如何在Python中将两个子进程的标准输出连接并管道到新子进程的标准输入?

6
假设我从shell中运行了以下命令:
{ 
samtools view -HS header.sam;           # command1
samtools view input.bam 1:1-50000000;   # command2
} | samtools view -bS - > output.bam    # command3

对于那些不熟悉samtools view的人(因为这是stackoverflow)。本质上,这是创建具有新标题的新bam文件。bam文件通常是大型压缩文件,因此即使在某些情况下通过文件进行传递也可能耗时。一种替代方法是执行command2,然后使用samtools reheader切换标题。这将两次经过大文件。上述命令只经过一次bam,适用于更大的bam文件(它们即使在压缩后也可能达到20GB以上-WGS)。

我的问题是如何使用subprocess在Python中实现这种类型的命令。

我有以下内容:

fh_bam = open('output.bam', 'w')
params_0 = [ "samtools", "view", "-HS", "header.sam" ]
params_1 = [ "samtools", "view", "input.bam", "1:1-50000000"]
params_2 = [ "samtools", "view", "-bS", "-" ]
sub_0 = subprocess.Popen(params_0, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
sub_1 = subprocess.Popen(params_1, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
### SOMEHOW APPEND sub_1.stdout to sub_0.stdout
sub_2 = subprocess.Popen(params_2, stdin=appended.stdout, stdout=fh_bam)

非常感激您的帮助。谢谢。


fh_bam是什么?为什么不直接从两者的输出中获取并在最后一个进程的命令中使用它? - Padraic Cunningham
输出bam文件的文件处理器。好吧,前两个命令基本上是读取文件的一部分并将其放入stdout中。因此,“获取输出”已经在文件中可用。唯一的区别是第二个命令实际上是抓取文件的一部分。而且那个特定的文件是压缩的,所以追加文件并不简单。 - Marco Albuquerque
你想要第二次调用的输出还是两次调用的输出都要? - Padraic Cunningham
我想要将前两个命令的输出合并(这两个命令都将压缩文件转换为文本),但是不写入磁盘,而是直接传输到下一个命令,该命令在写入之前会对文件进行压缩。 - Marco Albuquerque
4个回答

4

如果您已经在字符串中有了shell命令,则可以直接运行它:

#!/usr/bin/env python
from subprocess import check_call

check_call(r"""
{ 
samtools view -HS header.sam;           # command1
samtools view input.bam 1:1-50000000;   # command2
} | samtools view -bS - > output.bam    # command3
""", shell=True)

为了在Python中模拟管道,请按照以下步骤操作:
#!/usr/bin/env python
from subprocess import Popen, PIPE

# start command3 to get stdin pipe, redirect output to the file
with open('output.bam', 'wb', 0) as output_file:
    command3 = Popen("samtools view -bS -".split(), 
                     stdin=PIPE, stdout=output_file)
# start command1 with its stdout redirected to command3 stdin
command1 = Popen('samtools view -HS header.sam'.split(),
                 stdout=command3.stdin)
rc_command1 = command1.wait() #NOTE: command3.stdin is not closed, no SIGPIPE or a write error if command3 dies
# start command2 after command1 finishes
command2 = Popen('samtools view input.bam 1:1-50000000'.split(),
                 stdout=command3.stdin)
command3.stdin.close() # inform command2 if command3 dies (SIGPIPE or a write error)
rc_command2 = command2.wait()
rc_command3 = command3.wait()

1

很遗憾,我不能评论,但是这个“答案”是对cmidi答案的评论,如果有人能移动它,将不胜感激!-- PS:那个回答现在已经被删除了...

Marco明确表示这些命令会产生大量输出,约为20GB。如果使用communicate(),它将等待进程终止,这意味着'fd'描述符需要保存那么多的数据。实际上,在此期间操作系统将把数据刷新到磁盘上,除非您的计算机有超过20GB的空闲RAM。因此,最终你会将中间数据写入磁盘,而原始作者想要避免这种情况。 sirlark的答案+1!


是的,说得好,尤其是关于管道阻塞的问题。你先到了那里。 - sirlark
@Ariel:FYI,sirlark的答案行不通 - jfs

0

我假设由于涉及的文件大小,将第一和第二个子进程的输出连接在内存中不可行。我建议将前两个子进程的输出包装在类似文件的文件中。看起来你只需要read方法,因为popen只会从其stdin文件中读取,而不是寻找或写入。下面的代码假定从read返回空字符串就足以表示流已到达EOF。

class concat(object):
    def __init__(self, f1, f2):
        self.f1 = f1
        self.f2 = f2

    def read(self, *args):
        ret = self.f1.read(*args)
        if ret == '':
            ret = self.f2.read(*args)
        return ret

fh_bam = open('output.bam', 'w')
params_0 = [ "samtools", "view", "-HS", "header.sam" ]
params_1 = [ "samtools", "view", "input.bam", "1:1-50000000"]
params_2 = [ "samtools", "view", "-bS", "-" ]
sub_0 = subprocess.Popen(params_0, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
sub_1 = subprocess.Popen(params_1, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
### Somehow Append sub_1.stdout to sub_0.stdout
sub_2 = subprocess.Popen(params_2, stdin=concat(sub_0.stdout, sub_1.stdout), stdout=fh_bam)

为了澄清,f1.read会阻塞,并且只有在管道关闭/到达文件末尾时才返回''。只有在这种情况发生后,concat.read 才会尝试从f2读取,所以来自f1f2的输出不会交织在一起。当然,反复读取f1的末尾会稍微增加一点开销,但可以通过设置一个标志变量来指示要从哪个文件中读取来避免这种情况。虽然我怀疑这不会显著改善性能。

你能解释一下 read(self, *args) 函数在做什么吗?我不明白为什么要检查 ret 是否等于 ''。 - Marco Albuquerque
1
无法保证popen如何调用您提供的stdin管道文件对象的read方法。read可以使用大小参数限制读取内容,这正是您想要的,否则所有内容都将被读入内存。但这意味着第一个进程的stdout可能不会在一次调用read中完全读取。因此,我们从f1的stdout读取,直到它为空,然后才转到f2的stdout... - sirlark
1
downvote。Popen的stdin不接受类似文件的对象。它需要一个真正的.fileno()(在某些系统上是真正的文件,管道或套接字)。还有其他问题。 - jfs
是的,@J.F.Sebastian是正确的 :( 我在平板电脑上匆忙编写了这个代码,没有测试就假设它会工作。文档说Popen接受文件类对象,但测试表明读/写方法从未被调用,而需要一个fileno方法。可能是使用了os.read(<fd>)和os.write(<fd>)。我已经发布了另一个使用管道的答案。 - sirlark
@sirlark:os.read(<fd>)subprocess模块中并未使用。通过类似于os.dup2(<fd>, 0)的方式重定向了管道。子进程可以按照自己的喜好从输入中读取,例如stdio的getchar()函数。 - jfs

-1

虽然Popen接受类似文件的对象,但实际上它使用底层的文件句柄/描述符来通信,而不是文件对象的读写方法,正如@J.F. Sebastian所指出的那样。更好的方法是使用管道(os.pipe()),这不会使用磁盘。这允许您直接将输出流连接到另一个进程的输入流,这正是您想要的。问题只是序列化的问题,以确保两个源流不会交错。

import os
import subprocess

r, w = os.pipe()

fh_bam = open('output.bam', 'w')
params_0 = [ "samtools", "view", "-HS", "header.sam" ]
params_1 = [ "samtools", "view", "input.bam", "1:1-50000000"]
params_2 = [ "samtools", "view", "-bS", "-" ]
sub_sink = subprocess.Popen(params_2, stdin=r, stdout=fh_bam, bufsize=4096)
sub_src1 = subprocess.Popen(params_0, stderr=subprocess.PIPE, stdout=w, bufsize=4096)
sub_src1.communicate()
sub_src2 = subprocess.Popen(params_1, stderr=subprocess.PIPE, stdout=w, bufsize=4096)
sub_src2.communicate()

我们首先打开接收端(管道的读取器),然后只与源进程进行通信,以避免像@Ariel所提到的潜在阻塞。这也迫使第一个源进程完成并在管道上刷新其输出,然后第二个源进程才有机会写入管道,从而防止交错/覆盖输出。您可以玩弄bufsize值来调整性能。
这基本上就是shell命令正在做的事情。

(1)在这种情况下,“bufsize”对性能没有影响。原则上,(由于“stderr = PIPE”),“bufsize”可能会影响如何读取“stderr”(尽管在这种情况下不会,因为在posix上的“.communicate()”不使用“stderr.read()”——它使用“select”)。 “bufsize”对stdin,stdout没有影响,因为它们没有分配给PIPE。(2)如果您删除“stderr = PIPE”,那么“.communicate()”调用是不必要的,可以像我的答案一样使用“.wait()”。(3)在父进程中应关闭未使用的管道端口(在将它们传递给子进程之后)。 - jfs

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接