Python子进程并行执行

23

我希望能够并行运行多个进程,并随时获取stdout。我应该如何做?我需要为每个subprocess.Popen()调用运行线程吗?


可能是如何使用Python运行多个可执行文件?的重复问题。 - André Caron
相关:以下是如何同时运行多个shell命令(并可选择捕获它们的输出)的方法。 - jfs
4个回答

24
你可以在单线程中完成它。
假设你有一个随机时间打印行的脚本:
#!/usr/bin/env python
#file: child.py
import os
import random
import sys
import time

for i in range(10):
    print("%2d %s %s" % (int(sys.argv[1]), os.getpid(), i))
    sys.stdout.flush()
    time.sleep(random.random())

如果您希望在输出可用时立即收集输出,您可以在POSIX系统上使用select,正如@zigg建议的那样

#!/usr/bin/env python
from __future__ import print_function
from select     import select
from subprocess import Popen, PIPE

# start several subprocesses
processes = [Popen(['./child.py', str(i)], stdout=PIPE,
                   bufsize=1, close_fds=True,
                   universal_newlines=True)
             for i in range(5)]

# read output
timeout = 0.1 # seconds
while processes:
    # remove finished processes from the list (O(N**2))
    for p in processes[:]:
        if p.poll() is not None: # process ended
            print(p.stdout.read(), end='') # read the rest
            p.stdout.close()
            processes.remove(p)

    # wait until there is something to read
    rlist = select([p.stdout for p in processes], [],[], timeout)[0]

    # read a line from each process that has output ready
    for f in rlist:
        print(f.readline(), end='') #NOTE: it can block

更加便携的解决方案(适用于Windows、Linux、OSX)可以为每个进程使用读取线程,请参见Python中对subprocess.PIPE进行非阻塞读取

这是一个基于os.pipe()的解决方案,适用于Unix和Windows:

#!/usr/bin/env python
from __future__ import print_function
import io
import os
import sys
from subprocess import Popen

ON_POSIX = 'posix' in sys.builtin_module_names

# create a pipe to get data
input_fd, output_fd = os.pipe()

# start several subprocesses
processes = [Popen([sys.executable, 'child.py', str(i)], stdout=output_fd,
                   close_fds=ON_POSIX) # close input_fd in children
             for i in range(5)]
os.close(output_fd) # close unused end of the pipe

# read output line by line as soon as it is available
with io.open(input_fd, 'r', buffering=1) as file:
    for line in file:
        print(line, end='')
#
for p in processes:
    p.wait()

3
在你的最新解决方案中,你似乎将所有子进程的标准输出复用到一个文件描述符(output_fd)中。如果两个子进程同时打印输出,那么输出是否会混乱(例如'AAA\n' + 'BBB\n' -> 'ABBB\nAA\n')? - dan3
2
@dan3:这是一个合理的担忧。少于PIPE_BUF字节的write操作是原子性的。否则,来自多个进程的数据可能会交错。POSIX要求至少512字节。在Linux上,PIPE_BUF为4096字节。 - jfs
我尝试了os.pipe的解决方案,它非常有效,但是当我尝试添加进度条(例如:对于每一行我获得,我增加一个数字并打印该数字)时,我注意到当所有行都完成时,它们似乎同时出现。有什么方法可以避免这种行为吗? - FORTRAN
@Gunnar,你是否使用答案中的代码一次性获取了“所有行”?(不应该:请注意子进程中的sys.stdout.flush())。这可能是您的子进程中的块缓冲问题,请参见Python C程序子进程在“for line in iter”处挂起 - jfs
1
@J.F.Sebastian 我自己写了一个子程序(但是忘记刷新了),但是使用刷新后它很好地工作了,谢谢。 - FORTRAN
显示剩余3条评论

6

您还可以使用twisted同时从多个子进程中收集stdout:

#!/usr/bin/env python
import sys
from twisted.internet import protocol, reactor

class ProcessProtocol(protocol.ProcessProtocol):
    def outReceived(self, data):
        print data, # received chunk of stdout from child

    def processEnded(self, status):
        global nprocesses
        nprocesses -= 1
        if nprocesses == 0: # all processes ended
            reactor.stop()

# start subprocesses
nprocesses = 5
for _ in xrange(nprocesses):
    reactor.spawnProcess(ProcessProtocol(), sys.executable,
                         args=[sys.executable, 'child.py'],
                         usePTY=True) # can change how child buffers stdout
reactor.run()

请参阅Twisted中的使用进程

4
您不需要为每个进程运行一个线程。您可以查看每个进程的stdout流,而无需在其上阻塞,并且只有在它们有可读数据时才从中读取。
但是,如果您没有打算这样做,您必须小心不要意外地阻塞它们。

我会多次执行 p = subprocess.Popen(…),然后再执行 print p.communicate()[0]。但是 communicate() 会一直等待进程结束。 - sashab
1
是的,这就是为什么如果您想使用单个线程,就不能使用communicate()。除了communicate()之外,还有其他获取stdout的方法。 - Amber
2
你可能需要查看select模块,以便一次等待多个子进程。 - zigg
@zigg:请注意,在Windows上,select()仅接受套接字,任何基于管道的select()代码都不具备可移植性。 - André Caron

0
你可以等待 process.poll() 完成,同时并发地运行其他东西:
import time                                                                                                                                                                                                                                                                   
import sys                                                                                                                                                                                                                                                                    
from subprocess import Popen, PIPE        
                                                                                                                                                                                                                                
def ex1() -> None:                                                                                                                                                                                                                                                            
    command = 'sleep 2.1 && echo "happy friday"'                                                                                                                                                                                                                              
    proc = Popen(command, shell=True, stderr=PIPE, stdout=PIPE)                                                                                                                                                                                                               
    while proc.poll() is None:                                                                                                                                                                                                                                                
        # do stuff here                                                                                                                                                                                                                                                       
        print('waiting')                                                                                                                                                                                                                                                      
        time.sleep(0.05)                                                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                                              
    out, _err = proc.communicate()                                                                                                                                                                                                                                            
    print(out, file=sys.stderr)                                                                                                                                                                                                                                               
    sys.stderr.flush()                                                                                                                                                                                                                                                        
    assert proc.poll() == 0                                                                                                                                                                                                                                                   
                                                                                                                                                                                                                                                                              
ex1()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接