等待多个并行作业的子进程结束

13

我正在使用Python并行运行一些子进程。我想要等待每个子进程都完成后再继续执行程序。目前我采用的解决方案不够优雅:

runcodes = ["script1.C", "script2.C"]
ps = []
for script in runcodes:
  args = ["root", "-l", "-q", script]
  p = subprocess.Popen(args)
  ps.append(p)
while True:
  ps_status = [p.poll() for p in ps]
  if all([x is not None for x in ps_status]):
    break

有没有一个类可以处理多个子进程?问题在于wait方法会阻塞我的程序。

更新:我想在计算过程中显示进度:例如“已完成4/7个子进程...”

如果你好奇,可以编译并执行C++脚本。

5个回答

11

如果您的平台不是Windows,您可能会选择针对子进程的stdout管道进行选择。 然后,您的应用程序将阻塞,直到以下情况之一发生:

  • 已注册的文件描述符之一具有I/O事件(在这种情况下,我们对子进程的stdout管道上的挂起感兴趣)
  • 轮询超时

使用Linux 2.6.xx的epoll的非详细示例:

import subprocess
import select

poller = select.epoll()
subprocs = {} #map stdout pipe's file descriptor to the Popen object

#spawn some processes
for i in xrange(5):
    subproc = subprocess.Popen(["mylongrunningproc"], stdout=subprocess.PIPE)
    subprocs[subproc.stdout.fileno()] = subproc
    poller.register(subproc.stdout, select.EPOLLHUP)

#loop that polls until completion
while True:
    for fd, flags in poller.poll(timeout=1): #never more than a second without a UI update
        done_proc = subprocs[fd]
        poller.unregister(fd)
        print "this proc is done! blah blah blah"
        ...  #do whatever
    #print a reassuring spinning progress widget
    ...
    #don't forget to break when all are done

这很不错!在mylongrunningproc运行时,有没有办法让subproc.stdout打印到终端? - unutbu
1
首先想到的是为输入事件注册stdout管道 - poller.register(subproc.stdout, select.EPOLLHUP | select.EPOLLIN)。然后你可以使用 if flags & select.EPOLLIN: print done_proc.stdout.readline()。但是,如果输出没有按行分隔符,则必须小心无限期阻塞的情况。在Linux中,我认为您可以通过使用fcntl将stdout管道设置为非阻塞,并使用errno = EAGAIN捕获IOError来解决此问题。 示例 - fcntl.fcntl(subproc.stdout.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) - Jeremy Brown
管道非阻塞读取的参考链接 - (http://www.gossamer-threads.com/lists/python/dev/658205) - Jeremy Brown
如果解决方案基于select系统调用,那么更好的高级抽象可能是asyncio,它最终也是基于select的。我会为此创建另一个答案。 - turbopapero

8

怎么样?

import os,subprocess
runcodes = ["script1.C", "script2.C"]
ps = {}
for script in runcodes:
    args = ["root", "-l", "-q", script]
    p = subprocess.Popen(args)
    ps[p.pid] = p
print("等待%d个进程..." % len(ps))
while ps:
    pid, status = os.wait()
    if pid in ps:
        del ps[pid]
        print("等待%d个进程..." % len(ps))

注意:os.wait() 需要 Linux。 - Massimo

7
您可以像这样做:

您可以像这样做:

runcodes = ["script1.C", "script2.C"]

ps = []
for script in runcodes:
    args = ["root", "-l", "-q", script]
    p = subprocess.Popen(args)
    ps.append(p)

for p in ps:
    p.wait()

进程将并行运行,您将在最后等待所有进程完成。

5
是的,问题在于我无法在执行期间写入 #process finished,因为假设第一个子进程非常缓慢,变量 p 等于第一个 ps 命令,并且 Python 会一直等待并冻结,直到第一个进程完成;Python 无法写下除第一个进程外的所有子进程已经完成。 - Ruggero Turra
如果我想要等待第一个进程完成后再杀死另一个进程,该怎么做? - andreykyz

2
这个答案与此答案相关,并使用类似的机制(基于select系统调用)称为asyncio。您可以在此处了解有关asyncio的更多信息。
当您的进程受IO限制时,Asyncio非常适用。至少在程序的这个部分中,您的进程似乎受到IO限制,大部分时间都在等待外部脚本完成,只有在它们结束时才打印一条消息。
以下代码应该适合您(可能需要进行一些小的调整):
import asyncio

# The scripts you want to run concurrently
runcodes = ["script1.C", "script2.C"]

# An awaitable coroutine that calls your script
# and waits (non-blocking) until the script is done 
# to print a message
async def run_script(script):
    # You will need to adjust the arguments of create_subprocess_exec here
    # according to your needs
    p = await asyncio.create_subprocess_exec(script)
    await p.wait()
    print("Script", script, "is done")

# You create concurrent tasks for each script
# they will start in parallel as soon as they are
# created
async def main():
    tasks = []
    for script in runcodes:
        tasks.append(asyncio.create_task(run_script(script)))

    # You wait until all the tasks are done before 
    # continuing your program
    for task in tasks:
        await task

if __name__ == "__main__":
    asyncio.run(main())

详细解释

Asyncio 可以让你通过交替各个异步任务并在它们都被阻塞时等待,来用单个线程执行并发任务。

run_script函数是异步的,并且会使用类似于subprocess.Popen的机制调用你的脚本。这里的区别是返回的对象是可等待的(awaitable),意味着当等待函数被阻塞时,你可以跳到别的地方。

你可以在这里了解更多关于使用 asyncio 管理子进程的内容。你会发现,处理子进程的方式与通常的 Python 子进程非常相似。Popen 的参数也很类似。

请注意,这与线程不同。事实上,这个程序是单线程的。不要混淆 asyncio 和多线程。它们是并发运行任务的两种不同方法(具有优缺点)。

主函数将创建多个任务,每个任务用于运行你想要运行的一个脚本,并等待它们完成。

重要的是,这个await不会阻塞,同时它也不会进行实时轮询。它将休眠,直到任何一个任务准备就绪。一旦任务就绪,执行将返回到该任务,该任务可以打印有关你的消息的语句。

程序将不会退出主函数,直到所有等待的任务都完成。它将在由asyncio.run 生成的等待函数循环内保持,直到所有东西都完成。


0

我认为答案不在 Python 代码或语言特性中,而是在系统功能中,考虑这个解决方案:

runcodes = ["script1.C", "script2.C"]

Args = []
for script in runcodes:
    Args += " ".join(["root", "-l", "-q", script])


P = subprocess.Popen(" & ".join(Args))
P.wait()

依赖于系统能力?不太可移植。 - Ruggero Turra
是的,这里不具备可移植性,但我认为只需要两行代码就可以实现。抱歉,我是Linux用户。 - Mhadhbi issam
检测操作系统,然后将“&”更改为Windows等效字符。 - Mhadhbi issam

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接