块 - 将输入发送到Python子进程管道

34

我正在使用Python测试子进程流水线。我知道我可以直接在Python中完成以下程序所做的事情,但这不是重点。我只想测试一下流水线,以便了解如何使用它。

我的系统是Linux Ubuntu 9.04,默认使用Python 2.6。

我从这个文档示例开始。

from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output

那样做是可行的,但由于的标准输入未被重定向,我必须在终端中键入内容以供管道使用。当我键入^D关闭标准输入时,我得到了想要的输出。

然而,我想使用Python字符串变量将数据发送到管道。首先,我尝试在标准输入上写入:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here

没起作用。我尝试在上一行使用p2.stdout.read(),但它也会阻塞。我添加了p1.stdin.flush()p1.stdin.close(),但它们也没有起作用。然后我转向使用通信:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0] 

所以这还不是它。
我注意到运行单个进程(如上面的p1,去掉p2)可以完美地工作。并且将文件句柄传递给p1stdin=open(...))也可以工作。所以问题是:
在Python中是否有可能向由2个或更多个子进程组成的管道传递数据,而不会阻塞?为什么不行?
我知道我可以运行一个shell并在shell中运行管道,但那不是我想要的。
更新 1:在遵循Aaron Digulla的提示之后,我现在正在尝试使用线程来使其工作。
首先,我尝试在一个线程上运行p1.communicate。
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here

好的,没有起作用。尝试了其他组合,比如将其更改为.write()p2.read()。仍然不行。现在让我们尝试相反的方法:

def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.communicate('data\n') # blocks here.
t.join()

代码最终会在某个地方阻塞。可能是在生成的线程中,也可能在主线程中,或者两者都存在。所以它没能正常工作。如果您知道如何使其工作,提供工作代码将更加容易。我正在尝试。


更新 2

保罗·杜博伊斯(Paul Du Bois)在下面给出了一些信息,因此我进行了更多测试。 我阅读了整个 subprocess.py 模块并了解了其工作原理。所以我尝试将其应用于代码。

我在Linux上进行测试,但由于我正在测试线程,因此我的第一个方法是复制subprocess.pycommunicate()方法中看到的确切的Windows线程代码,但是针对两个进程而不是一个。以下是我尝试过的所有清单:

import os
from subprocess import Popen, PIPE
import threading

def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread

for x in xrange(100000):
    p1.stdin.write('hello world\n') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()

好吧,它没起作用。即使调用了 p1.stdin.close()p2.stdout.read() 仍然会阻塞。

然后我尝试了 subprocess.py 上的 posix 代码:

import os
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer

while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!\n'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

还会阻塞select.select()。通过分散print,我发现了以下内容:
  • 读取是有效的。在执行期间代码多次读取。
  • 写入也有效。数据被写入p1.stdin
  • numwrites结束时,调用p1.stdin.close()
  • select()开始阻塞时,只有to_read有内容,即p2.stdout。此时to_write已经为空。
  • os.read()调用总是返回一些内容,因此从未调用p2.stdout.close()

从两个测试中得出的结论:关闭管道中第一个进程(grep在这个例子中)的stdin并不能使它将其缓冲输出转储到下一个进程并死亡。

没有办法使它工作吗?

PS: 我不想使用临时文件,我已经测试过文件并知道它可以工作。而且我不想使用Windows。


关闭grep的标准输入必须使其输出。如果这种情况没有发生,那么就真的出了很大的问题。 - Aaron Digulla
2
这个问题和你下面的解决方案是绝对的宝藏。感谢您节省了我数小时的工作时间 - 这种信息是使stackoverflow成为如此出色资源的原因。 - Andrew
11个回答

22

我找到了解决方法。

这与线程无关,也与select()无关。

当我运行第一个进程(grep)时,它会创建两个低级文件描述符,分别对应于每个管道。我们称其为ab

当我运行第二个进程时,b被传递给了cut的标准输入。但是在Popen上有一个愚蠢的默认设置-close_fds=False

这样的效果是cut也继承了a。因此,即使我关闭了agrep也无法退出,因为cut的进程仍然打开了stdin(cut会忽略它)。

现在以下代码可以完美运行。

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds=True在Unix系统上应该是默认设置。在Windows系统上,它会关闭所有文件描述符,因此会阻止管道传输。

编辑:

PS:对于阅读本答案的遇到类似问题的人们:正如pooryorick在评论中所说,如果写入到p1.stdin的数据比缓冲区大,那么也可能会阻塞。在这种情况下,您应该将数据分成较小的块,并使用select.select()来知道何时读/写。问题中的代码应该给出如何实现这一点的提示。

编辑2:发现另一个解决方案,得到了pooryorick的更多帮助-而不是使用close_fds=True并关闭所有文件描述符,可以在执行第二个进程时关闭属于第一个进程的fd,这样就能够运行。关闭必须在子进程中完成,因此Popen函数中的preexec_fn函数非常有用。在执行p2时,您可以执行以下操作:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)

3
在Python 3.3中,默认情况下close_fds=True,但在Python 2.7中不是。 - ggg
一个更好的选择是将生产者放在线程中,然后通过从管道的输出端 逐步 读取来驱动管道,类似于 这个例子 - Jed
@Jed:你提供的例子并没有解决问题,因为它没有将一个子进程的输出导入到另一个子进程的输入中,这正是问题产生的原因。像你在例子中所做的那样运行单个进程是可以正常工作的。如果你有运行两个或更多进程并将一个进程的输出导入到另一个进程的输入而不阻塞的示例,请提供。我试图改编你的例子,但它没有起作用。此外,我无法看到它是“更好”的,为什么运行一个线程等待IO是“更好”的呢?这样做可能会与其他东西(如信号或fork())产生冲突,所以最好避免! - nosklo
  1. 你没有说你是如何修改的,但一个非常简单的修改就可以很好地解决问题。你应该能够从详细的答案中找到答案。
  2. select 是较低级别的,而且不可移植(在Windows上)。线程/进程允许你不将逻辑纠缠到选择循环中。当操作系统线程变得过重时,你可以使用 gevent,但在这里它们是一个很好的解决方案。
- Jed
@Jed,唯一“非常简单”的修改是我在这个回答中提到的close_fds=True参数,如果你使用它,你就不需要线程,一切都可以正常工作。 - nosklo
显示剩余3条评论

7

处理大文件

在处理大文件时,需要统一采用两个原则。

  1. 由于任何I/O操作都可能会阻塞线程,因此我们必须将管道的每个阶段放在不同的线程或进程中运行。这个例子中我们使用了线程,但是使用子进程可以避免GIL问题。
  2. 我们必须使用增量读写的方式,这样才不会在等待 EOF 时停止进展。

另一种方法是使用非阻塞I/O,但在标准Python中这很麻烦。可以使用gevent来实现基于非阻塞原语的同步I/O API。

示例代码

我们将构建一个愚蠢的管道,大致如下:

{cat /usr/share/dict/words} | grep -v not              \
    | {upcase, filtered tee to stderr} | cut -c 1-10   \
    | {translate 'E' to '3'} | grep K | grep Z | {downcase}

每个用大括号{}表示的阶段都是由Python实现,而其他阶段使用标准外部程序。简而言之:请查看此代码片段

我们从预期的导入开始。

#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading

Python管道的阶段

除了管道中最后一个由Python实现的阶段之外,所有其他阶段都需要在线程中运行,以��免其IO阻塞其他阶段。如果您希望它们实际并行运行(避免GIL),可以改为在Python子进程中运行。

def writer(output):
    for line in open('/usr/share/dict/words'):
        output.write(line)
    output.close()
def filter(input, output):
    for line in input:
        if 'k' in line and 'z' in line: # Selective 'tee'
            sys.stderr.write('### ' + line)
        output.write(line.upper())
    output.close()
def leeter(input, output):
    for line in input:
        output.write(line.replace('E', '3'))
    output.close()

每个任务都需要放在自己的线程中,我们可以使用这个方便的函数来实现。

def spawn(func, **kwargs):
    t = threading.Thread(target=func, kwargs=kwargs)
    t.start()
    return t

创建管道

使用Popen创建外部阶段,使用spawn创建Python阶段。参数bufsize=-1表示使用系统默认缓冲区(通常为4 kiB)。这通常比默认的(无缓冲)或行缓冲快,但如果要实时监视输出而不产生延迟,则需要使用行缓冲。

grepv   = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut     = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)

twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)

推动流水线

按照上述方式组装,管道中的所有缓冲区都将填满,但由于没有人从末端(grepz.stdout)读取,它们都会阻塞。我们可以通过一次调用grepz.stdout.read()来读取整个内容,但对于大文件来说,这样会占用大量内存。因此,我们需要进行增量读取。

for line in grepz.stdout:
    sys.stdout.write(line.lower())

线程和进程一旦到达 EOF 就会清理。我们可以使用以下方式显式地清理:
for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()

Python-2.6及更早版本

subprocess.Popen 内部调用 fork,配置管道文件描述符并调用 exec。从 fork 产生的子进程拥有父进程中所有文件描述符的副本,必须将 两个 副本都关闭后,相应的读取器才能得到 EOF。可以通过手动关闭管道 (使用 close_fds=True 或适当的 preexec_fn 参数传递给 subprocess.Popen) 或设置 FD_CLOEXEC 标志以使 exec 自动关闭文件描述符来解决此问题。在 Python-2.7 及更高版本中自动设置此标志,请参见 issue12786 。我们可以通过调用以下代码在早期版本的 Python 中获得类似于 Python-2.7 的行为:

p._set_cloexec_flags(p.stdin)

在将 p.stdin 作为后续 subprocess.Popen 的参数之前,需要进行一些处理。


我不想让数据在每个步骤中都通过Python进行处理。我希望数据可以直接从一个进程传递到另一个进程。我尝试将您的代码适应到我的问题上,但它仍然卡住了。这是我的尝试:http://bpaste.net/show/NKX5FLaHCskMpO7YnwuS/ 您能否帮助我使用您的方法修复它,以便它不会卡住?--请注意,要重现问题,您必须使用与我相同的环境,即旧版Ubuntu 9.04中的Python 2.6。在Python 2.7最近的Ubuntu中,已经修复了这个问题,并且我的原始代码(问题中的代码)已经可以工作 - nosklo
1
请注意,在我的示例中,grepk 直接进入 grepz。我展示了如何将其扩展到任意管道。我添加了对 FD_CLOEXECissue12786 的解释,现在我明白这就是你最初遇到的问题。缓冲问题仍然存在于你的代码中,并且建议人们使用 select 是不必要的混淆。如果在你的示例中的两个 Popen 调用之间添加 p1._set_cloexec_flag(p1.stdin),它将适用于所有 Python-2.x。 - Jed
谢谢,太好了,_set_cloexec_flag() 私有函数是解决问题的另一种方法。然而,这个答案仍然回答了一个关于大文件的不同问题,这不是我最初提出的问题。我只写了十几个字节,所以在这个程序中我不需要担心缓冲区。如果你愿意编辑它并删除“大文件”部分,只留下对我的问题的回答(答案的最后一部分),我会点赞。否则,我将编辑我的答案并添加你提供的信息。感谢链接到这个问题,它已经非常清楚了。 - nosklo
@nosklo,我不在乎你的点赞。我写这篇答案是因为有太多误导性信息和所选答案脆弱且不易扩展。最好是提出一个新问题并将该答案移动到那里。 - Jed
嗯,我不知道选定答案有什么问题。在您最后一次编辑之前,它是唯一实际回答问题的答案!问题/答案包含的代码是示例代码,用于重现问题并演示修复方法,根本不是处理任何文件的复杂示例。我认为另外提出一个关于大文件或其他问题的问题并在那里回答会是一个好的解决方案,而将这个问题仅留给当FD未正确关闭时所遇到的阻塞情况。 - nosklo

3
Nosklo提供的解决方案如果向管道的接收端写入过多数据,将很快出现问题:

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n' * 20000)
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

如果这个脚本在你的机器上没有卡住,只需将“20000”增加到超过操作系统管道缓冲区大小的值即可。这是因为操作系统正在缓冲输入到“grep”的内容,但一旦该缓冲区已满,“p1.stdin.write”调用将会阻塞,直到有东西从“p2.stdout”读取。在玩具场景中,你可以通过在同一进程中写入/读取来完成,但在正常使用中,必须从一个线程/进程中写入并从另一个线程/进程中读取。对于subprocess.popen、os.pipe、os.popen*等,都是如此。
另一个问题是有时候你想持续向管道中添加由同一管道早期输出生成的项目。解决方案是使管道喂食者和管道读取者异步于主程序,并实现两个队列:一个在主程序和管道喂食者之间,另一个在主程序和管道读取者之间。PythonInteract 就是这样的一个例子。

Subprocess是一个很方便的模型,但因为它在底层隐藏了os.popen和os.fork调用的细节,所以有时比它使用的低级别调用更难处理。因此,subprocess不是学习进程间管道如何工作的好方法。


你可以通过在管道上使用select.select()来从同一进程/线程中完成所有操作,真的不需要使用线程、进程或队列。我的答案没有涉及到这个问题,只是为了简单起见,但在问题中有一个如何实现的示例。至于最后一段 - 并不是每个人都想学习进程间管道的工作原理。他们只想让它们正常工作并且不影响其他操作。这就是为什么像subprocess.Popen这样的高级构造被创建出来。它们应该适用于大多数常见用例,而无需用户了解任何知识 - 这就是整个重点 - nosklo
即使使用线程,如果没有 close_fds=True,脚本也会挂起。请注意。 - nosklo
1
对不起,但是你又错了。要让你的select.select示例卡住,只需在"write"调用中溢出缓冲区即可: <code>p1.stdin.write('hello world!\n' * 5000); p1.stdin.flush()</code>。我向你挑战,要么提供一个使用select.select处理任意输入和管道缓冲的单线程示例,要么停止传播错误信息并赞同我的答案,因为它们纠正了你的错误。你是否知道每次调用Popen,都会创建一个或多个单独的线程/进程。你的玩具答案在真实世界中不起作用,反而导致困惑。 - Poor Yorick
http://paste.pocoo.org/show/176123/ -> 例子不会挂起。 它通过分块写入任意大小的输入,并且只有在 select.select 告诉您可以写入时才写入,因此它永远不会阻塞。 在这个例子中,没有线程被创建,从来没有,甚至在 Popen 中也没有。 当然,每个 Popen 都会创建一个新进程,因为,正如我之前所说,这就是 Popen 的全部意义。现在您的挑战已经完成,我现在向您提出挑战,提供使用线程的 subprocess.Popen PIPE 解决方案,不会阻塞,并且不使用 close_fds=True(这是真正的解决方案)。 - nosklo
你发布的代码存在致命缺陷。请查看我在回答中对发布的代码的解释。请注意,我已经使用os.fork而不是subprocess.Popen来完成您的挑战:http://pypi.python.org/pypi/pipeline/0.1。相同的原则适用于subprocess.Popen。 - Poor Yorick
我修复了我的代码,修复后的代码在另一个答案中。如果你在其他库中修复了它,我并不在意,因为问题非常具体,涉及到“subprocess.Popen”,所以如果你不知道答案,请闭嘴。我通常会对所有没有任何回答问题的答案进行负评。 - nosklo

3

让管道正常工作有三个主要技巧:

  1. 确保每个管道端点在不同的线程/进程中使用(顶部附近的一些示例存在此问题)。

  2. 在每个进程中明确关闭未使用的管道端点。

  3. 通过禁用缓冲区(Python -u选项)、使用pty或仅使用不会影响数据的内容(可能是'\n',但任何适合的内容都可以)来处理缓冲区。

Python“pipeline”模块中的示例(我是作者)完全符合您的情况,并使低级步骤相当清晰。

http://pypi.python.org/pypi/pipeline/

最近,我将subprocess模块作为生产者-处理器-消费者-控制器模式的一部分使用:

http://www.darkarchive.org/w/Pub/PythonInteract

此示例处理带缓冲的stdin,而无需使用pty,并说明应在何处关闭管道端点。我更喜欢进程而不是线程,但原则是相同的。此外,它还说明了如何同步队列,这些队列为生产者提供输入并从消费者收集输出,并如何干净地关闭它们(注意插入到队列中的哨兵)。此模式允许基于最近的输出生成新输入,从而实现递归发现和处理。


你不需要线程。对于运行管道这样的简单事情,要求使用线程是荒谬的。问题早就得到了解决,几个月前我在我的答案中已经解决了 - 原因是 close_fds=True 导致了问题。 - nosklo
1
你需要使用线程或进程。Subprocess.Popen在后台执行它们,因此您看不到它们。您最初遇到了很多麻烦,因为您不理解通过管道与另一个进程通信的原理,这就是为什么我发布了简洁说明细节的示例。您声称close_fds应该是Unix系统上的默认设置,这表明您仍然不太了解管道,或者至少没有考虑过可能的情况。 - Poor Yorick
1
此外,对于除了最琐碎的任务之外的所有任务,“通信”都不是将数据发送到管道的足够充分的机制——特别是如果您试图从另一端消耗数据。为了获得更强大的解决方案,您需要像我第二个示例中的队列一样的东西。此外,您“解决方案”示例能够工作的唯一原因是您被缓冲保存了,您可能甚至没有意识到。如果您向p1.stdin写入更多数据,它会再次挂起。看到您的解决方案示例的人们将会误解如何正确地执行此操作。 - Poor Yorick
我的回答只是一个简单的例子 - 如果需要写多于缓冲区允许的内容,那么应该使用select.select()来确切地知道何时可以在不阻塞的情况下读取或写入,正如我在问题中的尝试中所做的那样。我仍然不需要线程、进程或队列。普通的select,单个进程/线程即可。至于close_fds=True - 它应该是默认值,因为它会导致 WTFs。正如你所说,Popen实现隐藏了细节,所以默认情况下不应将fd传递给子进程。如果需要,应该显式地进行传递。 - nosklo
1
现在很明显你也不理解如何使用select.select。请看我上面的解释。你只是为了保护自己而对我的回答进行负评。 - Poor Yorick
显示剩余3条评论

2

回应nosklo在这个问题的其他评论中提出的断言,即没有close_fds=True就无法完成:

只有在您保留了其他文件描述符时才需要使用close_fds=True。当打开多个子进程时,始终要跟踪可能会被继承的打开文件,并显式关闭不需要的任何文件:

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p1.stdin.write('Hello World\n')
p1.stdin.close()
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
result = p2.stdout.read() 
assert result == "Hello Worl\n"
close_fds默认为False,因为子进程更愿意相信调用程序知道如何处理打开的文件描述符,并提供一个简单的选项来关闭它们。

但是真正的问题在于,除了玩具示例之外,管道缓冲区会影响到你。正如我在回答这个问题的其他答案中所说的那样,经验法则是不要让读取器和写入器在同一进程/线程中打开。任何想要使用子进程模块进行双向通信的人都应该先学习 os.pipe 和 os.fork。如果你有一个好的示例参考,它们实际上并不难使用。


好的,那并不是一个真正的管道,因为你在启动另一个进程之前关闭了一侧。即使如此,如果您更改代码以写入更多数据(比如 p1.stdin.write('Hello World\n' * 100000)),它也会阻塞。您说您需要将读取器和写入器分别放在两个进程中,但是您仍然没有提供使用 subprocess.Popen 成功执行它的任何代码。这就是问题所在。如果您知道答案,请回答。如果您不知道,请停止说“社区 wiki”之类的废话。 - nosklo
1
这说明您也不理解管道是什么。而且,这段代码确切地做了您的示例所做的事情,因此如果它不是一个管道,那么您的示例也不是。对于想要与另一个进程进行双向通信,特别是如果管道未来的输入取决于观察到的管道输出时,subprocess模块是不够的。Shell管道之美在于它们通过像subprocess一样缓冲输出来避免占用内存。您不需要使用subprocess.Popen显式地实例化多个进程,因为它在幕后使用os.fork来实现这一点。 - Poor Yorick
Popen会进行分叉,但它使用os.execvpe,因此最终为每个Popen创建单个新进程,正如它应该的那样。 - nosklo

2

你必须使用多个线程来完成此操作,否则你会陷入一种情况:子进程 p1 将无法读取你的输入数据,因为进程 p2 没有读取进程 p1 的输出结果,导致你无法发送数据。

因此,你需要一个后台线程来读取进程 p2 写出的内容。这将允许 p2 在写入一些数据到管道后继续执行,以便它可以从 p1 读取下一行输入,从而使 p1 能够处理你发送给它的数据。

或者,你可以使用后台线程向 p1 发送数据,并在主线程中读取 p2 的输出。但这两端都必须是一个线程。


谢谢,但那不起作用。我已经更新了我的问题,展示了我尝试过的内容。 - nosklo
1
在这种情况下,您不能使用communicate()。您必须读取和写入各个管道。 - Aaron Digulla
1
@nosklo .read() 是一个阻塞调用,它试图一直读取到 EOF(当没有参数调用时)。你需要逐步读取,就像 for line in output: ... - Jed
1
@nosklo 这是我的详细答案,适用于大文件。https://dev59.com/AnI-5IYBdhLWcg3w9thw#14061132 - Jed
@Jed 谢谢,但我不是因为大文件而有问题。即使只有一个字节,采用您的解决方案也会导致程序挂起。请查看您答案下的评论以获取更多详细信息。 - nosklo
显示剩余2条评论

1
在上面的评论中,我向nosklo提出了挑战,要求他发布一些代码来支持他关于select.select的断言,或者点赞我的回复,因为他之前曾经踩过我。他回应道:
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 100000 * 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = p2.stdout.read(1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

这个脚本的一个问题是它猜测系统管道缓冲区的大小/性质。如果能够消除像1024这样的魔数,脚本将遇到更少的故障。

最大的问题是,这个脚本代码只有在正确的数据输入和外部程序组合下才能一致地工作。grep和cut都使用行,因此它们的内部缓冲区的行为略有不同。如果我们使用更通用的命令,比如“cat”,并将较小的数据写入管道,致命的竞争条件将更频繁地出现:

from subprocess import Popen, PIPE
import select
import time

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    time.sleep(1)
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        print 'I am reading now!'
        data = p2.stdout.read(1024)
        if not data:
            p1.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        print 'I am writing now!'
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            print 'closing file'
            p1.stdin.close()
            to_write = []

print b

在这种情况下,将会呈现出两个不同的结果:
write, write, close file, read -> success
write, read -> hang

所以,我再次向nosklo发起挑战,要么发布代码展示如何使用select.select来处理任意输入和管道缓冲,要么就点赞我的回复。

底线:不要尝试从单个线程操纵管道的两端。这根本不值得。请参见pipeline,了解如何正确地执行此操作的一个很好的低级示例。


1
你的回答无法获得赞同,因为它们没有回答问题。你一直在吹嘘“必须使用线程”,却没有提供任何回答问题的Popen代码,让我感到厌倦。我通过调整一些语句来修复了我的代码错误,http://paste.pocoo.org/show/176561/,然后改进了它以便于测试。现在它测试了很多缓冲区大小和数据大小的组合,并且每个测试都重复了50次,**所有测试都正常工作**,而且没有使用线程。所以没有“魔数”。我仍在等待你的回答。 - nosklo
这是一个更好的select.select示例。它仍然包含一个不必要的close()函数--学习如何使用os.fork和os.pipe是掌握这些问题的最佳方法。魔数仍然存在--你只是将其参数化了。这个示例将你的有效缓冲区从(通常)64k减少到1k甚至更少,因此性能会受到影响。下一步:你如何确保整个输出恰好通过同一个管道返回一次? - Poor Yorick
关于Popen的重点在于它使用os.fork和os.exec,因此您已经在处理线程/进程了--只是需要知道如何操作管道。 - Poor Yorick
最后,对于多消费者/多生产者场景,select.select更为合适。它对于这种情况来说过于复杂了。如果想要一个更简单的解决方案(不使用close_fds),请参考我在这个问题中提供的os.fork答案。 - Poor Yorick

1

我认为你可能在考虑错误的问题。正如Aaron所说,如果你试图同时成为管道开头的生产者和管道末尾的消费者,很容易陷入死锁状态。这就是communicate()解决的问题。

对于你来说,communicate()并不完全正确,因为stdin和stdout位于不同的子进程对象上;但是,如果你查看subprocess.py中的实现,你会发现它确实做到了Aaron建议的那样。

一旦你看到communicate()既读又写,你就会发现在你的第二次尝试中,communicate()与p2竞争p1的输出:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
# ...
p1.communicate('data\n')       # reads from p1.stdout, as does p2

我正在运行在win32上,它肯定具有不同的I/O和缓冲特性,但这对我有效:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.stdin.write('hello world\n' * 100000)
p1.stdin.close()
t.join()

我调整了输入大小,以便在使用天真的未线程化的p2.read()时产生死锁。

您也可以尝试将缓冲区写入文件中,例如

fd, _ = tempfile.mkstemp()
os.write(fd, 'hello world\r\n' * 100000)
os.lseek(fd, 0, os.SEEK_SET)
p1 = Popen(["grep", "-v", "not"], stdin=fd, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
print p2.stdout.read()

这对我而言也可以避免死锁问题。


谢谢!!! 我已经检查了 subprocess.py 并尝试按照您说的做,但它仍然阻塞。我已更新我的问题。救命啊!!! - nosklo

0

当您调用fileno()方法(Popen需要这样做)时,SpooledTemporaryFile会转换为真实文件,因此它与使用文件相同-这就破坏了使用管道的初衷。 :( - nosklo

-1
这是一个使用Popen和os.fork一起完成相同任务的示例。它不使用close_fds,而是在正确的位置关闭管道。比尝试使用select.select简单得多,并充分利用系统管道缓冲区。
from subprocess import Popen, PIPE
import os
import sys

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)

pid = os.fork()

if pid: #parent
    p1.stdin.close()
    p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE)
    data = p2.stdout.read()
    sys.stdout.write(data)
    p2.stdout.close()

else: #child
    data_to_write = 'hello world\n' * 100000
    p1.stdin.write(data_to_write)
    p1.stdin.close()

嗯,这很有道理。但是它不必要地启动了3个新进程,而不是2个。因此,我会避免使用这个解决方案。 - nosklo
1
谢谢您的答案,我借鉴了它的帮助,找到了另一个解决方案,每个“Popen”只创建一个新进程,不需要“close_fds=True” - 使用“preexec_fn”在复制子进程之前关闭fds。我已经编辑了我的答案。 - nosklo

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接