我希望能够并行运行多个进程,并随时获取stdout。我应该如何做?我需要为每个subprocess.Popen()
调用运行线程吗?
我希望能够并行运行多个进程,并随时获取stdout。我应该如何做?我需要为每个subprocess.Popen()
调用运行线程吗?
#!/usr/bin/env python
#file: child.py
import os
import random
import sys
import time
for i in range(10):
print("%2d %s %s" % (int(sys.argv[1]), os.getpid(), i))
sys.stdout.flush()
time.sleep(random.random())
如果您希望在输出可用时立即收集输出,您可以在POSIX系统上使用select
,正如@zigg建议的那样:
#!/usr/bin/env python
from __future__ import print_function
from select import select
from subprocess import Popen, PIPE
# start several subprocesses
processes = [Popen(['./child.py', str(i)], stdout=PIPE,
bufsize=1, close_fds=True,
universal_newlines=True)
for i in range(5)]
# read output
timeout = 0.1 # seconds
while processes:
# remove finished processes from the list (O(N**2))
for p in processes[:]:
if p.poll() is not None: # process ended
print(p.stdout.read(), end='') # read the rest
p.stdout.close()
processes.remove(p)
# wait until there is something to read
rlist = select([p.stdout for p in processes], [],[], timeout)[0]
# read a line from each process that has output ready
for f in rlist:
print(f.readline(), end='') #NOTE: it can block
更加便携的解决方案(适用于Windows、Linux、OSX)可以为每个进程使用读取线程,请参见Python中对subprocess.PIPE进行非阻塞读取。
这是一个基于os.pipe()
的解决方案,适用于Unix和Windows:
#!/usr/bin/env python
from __future__ import print_function
import io
import os
import sys
from subprocess import Popen
ON_POSIX = 'posix' in sys.builtin_module_names
# create a pipe to get data
input_fd, output_fd = os.pipe()
# start several subprocesses
processes = [Popen([sys.executable, 'child.py', str(i)], stdout=output_fd,
close_fds=ON_POSIX) # close input_fd in children
for i in range(5)]
os.close(output_fd) # close unused end of the pipe
# read output line by line as soon as it is available
with io.open(input_fd, 'r', buffering=1) as file:
for line in file:
print(line, end='')
#
for p in processes:
p.wait()
PIPE_BUF
字节的write
操作是原子性的。否则,来自多个进程的数据可能会交错。POSIX要求至少512字节。在Linux上,PIPE_BUF
为4096字节。 - jfssys.stdout.flush()
)。这可能是您的子进程中的块缓冲问题,请参见Python C程序子进程在“for line in iter”处挂起。 - jfs您还可以使用twisted同时从多个子进程中收集stdout:
#!/usr/bin/env python
import sys
from twisted.internet import protocol, reactor
class ProcessProtocol(protocol.ProcessProtocol):
def outReceived(self, data):
print data, # received chunk of stdout from child
def processEnded(self, status):
global nprocesses
nprocesses -= 1
if nprocesses == 0: # all processes ended
reactor.stop()
# start subprocesses
nprocesses = 5
for _ in xrange(nprocesses):
reactor.spawnProcess(ProcessProtocol(), sys.executable,
args=[sys.executable, 'child.py'],
usePTY=True) # can change how child buffers stdout
reactor.run()
stdout
流,而无需在其上阻塞,并且只有在它们有可读数据时才从中读取。p = subprocess.Popen(…)
,然后再执行 print p.communicate()[0]
。但是 communicate()
会一直等待进程结束。 - sashabcommunicate()
。除了communicate()
之外,还有其他获取stdout的方法。 - Amberselect()
仅接受套接字,任何基于管道的select()
代码都不具备可移植性。 - André Caronprocess.poll()
完成,同时并发地运行其他东西:import time
import sys
from subprocess import Popen, PIPE
def ex1() -> None:
command = 'sleep 2.1 && echo "happy friday"'
proc = Popen(command, shell=True, stderr=PIPE, stdout=PIPE)
while proc.poll() is None:
# do stuff here
print('waiting')
time.sleep(0.05)
out, _err = proc.communicate()
print(out, file=sys.stderr)
sys.stderr.flush()
assert proc.poll() == 0
ex1()