等待第一个子进程完成

6

我有一个subprocess进程列表,我不与它们通信,只是等待。

我希望等待第一个进程完成(这个解决方案有效):

import subprocess

a = subprocess.Popen(['...'])
b = subprocess.Popen(['...'])

# wait for the first process to finish
while True:
    over = False
    for child in {a, b}:
        try:
            rst = child.wait(timeout=5)
        except subprocess.TimeoutExpired:
            continue  # this subprocess is still running

        if rst is not None:  # subprocess is no more running
            over = True
            break  # If either subprocess exits, so do we.
    if over:
        break

我不想使用os.wait(),因为它可能会从另一个不在我等待列表中的subprocess返回。

一种优雅的解决方案可能是使用epoll或select而不需要任何循环。


为什么不使用.communicate - Chase
我认为你的解决方案非常优雅。当进程完成时(仅在进程完成时),你能否让它们向stdout或stderr写入一些内容? - Roy2012
还有一个问题 - 你会考虑使用 https://pypi.org/project/psutil/ 这样的东西吗? - Roy2012
子进程不会写入标准输出或标准错误流,我真的不想与它们通信。我之前不知道 psutil,我会去看一下,或者你能否提供一个使用这个库的解决方案?谢谢。 - raphaelauv
@raphaelauv - 看下面的回答。显然,他们提供了一个特定的函数来处理这种用例。 - Roy2012
5个回答

4

您可以使用os.wait()来实现。只需循环调用该函数,直到它告诉您其中一个您关心的进程已经退出。

import subprocess

a = subprocess.Popen(['...'])

b = subprocess.Popen(['...'])


# wait for the first process to finish
watched_pids = set(proc.pid for proc in (a, b))
while True:
    pid, _ = os.wait()
    if pid in watched_pids:
        break

os.wait() 的一个隐藏副作用是你会失去该进程的退出码。在 os.wait() 完成后,该退出码将变为 None,如果稍后调用 proc.wait()proc.poll()proc.communicate(),它们将无法找到返回码并默认为 0。虽然可以手动设置,但这有点不正规。

def wait_and_handle_exitstatus(all_procs):
    pid, status = os.wait()
    for proc in all_procs:
        if proc.pid == pid:
            # We need to set the process's exit status now, or we
            # won't be able to retrieve it later and it will be
            # assumed to be 0.
            # This is a kind of hacky solution, but this function has existed
            # ever since subprocess was first included in the stdlib and is
            # still there in 3.10+, so it *should* be pretty stable.
            proc._handle_exitstatus(status)
    return pid, status

你可以使用第一个代码块,只需将os.wait()替换为 wait_and_handle_exitstatus(ALL_PROCS)。但是,你必须向wait_and_handle_exitstatus传递一个包含所有可能正在运行且你可能关心返回码的子进程(Popen对象)的列表,这样它才能找到该进程并设置其退出码。


3

以下是使用psutil解决此问题的方法 - 该工具专门针对此用例:

import subprocess
import psutil

a = subprocess.Popen(['/bin/sleep', "2"])

b = subprocess.Popen(['/bin/sleep', "4"])

procs_list = [psutil.Process(a.pid), psutil.Process(b.pid)]

def on_terminate(proc):
     print("process {} terminated".format(proc))

# waits for multiple processes to terminate
gone, alive = psutil.wait_procs(procs_list, timeout=3, callback=on_terminate)

或者,如果您想要一个循环等待其中一个进程完成:

while True: 
    gone, alive = psutil.wait_procs(procs_list, timeout=3, callback=on_terminate) 
    if len(gone)>0: 
        break

这个回答解决了你的问题吗? - Roy2012
似乎在做这件事,但是文档并不能保证如果至少有一个进程消失了,这个调用会返回。这个函数是否可能返回一个空的已消失结构体? - raphaelauv
可以的。如果你把超时时间改成更低的数字(例如1),你会得到一个空的“gone”结构体。顺便说一下,如果这回答了你的问题,能否接受我的答案就太好了。 - Roy2012
如果这回答解决了你的问题,如果你能接受它,那就太好了。 - Roy2012
我的编辑被拒绝了,但正确的答案应该是:while True: gone, alive = psutil.wait_procs(procs_list, timeout=3, callback=on_terminate) if len(gone)>0: break - raphaelauv
编辑并添加了您的循环。再次提醒 - 如果它回答了您的问题,如果您能接受它以供后代参考将是非常棒的。 - Roy2012

1
如果您不需要从进程中获取输出,Popen.poll() 似乎是检查它们是否完成的最简单方法。下面的 while True 循环仅用于演示目的:您可以决定在更大的程序中如何执行此操作(例如,在单独的线程中进行检查,在程序的其他工作之间进行检查等)。
from subprocess import Popen
from time import sleep

ps = [
    Popen(['sleep', t])
    for t in ('3', '5', '2')
]

while True:
    exit_codes = [p.poll() for p in ps]
    print(exit_codes)
    if any(ec is not None for ec in exit_codes):
        break
    else:
        sleep(1)

演示输出:

[None, None, None]
[None, None, None]
[None, None, 0]

Poll 有一个问题。在某些情况下,它会返回错误的结果。 - Roy2012
@Roy2012 很好知道。这个问题有文档记录吗? - FMc
是的 - 但我太懒了,没去找 :) 这里有一个链接:https://lists.gt.net/python/bugs/633489 - Roy2012
我的编辑被拒绝了,但对于这个问题的正确答案应该是:如果任何一个退出码(ec)不是None: 就跳出循环(break) 否则: 等待1秒(sleep(1)) - raphaelauv
1
问题是等待第一个子进程完成,而不是所有进程。这就是我想纠正你的答案的原因。 - raphaelauv

1
使用 asyncio.waitasyncio.as_completed:
import asyncio

async def example():
    p1 = await asyncio.create_subprocess_exec("sleep", "1")
    p2 = await asyncio.create_subprocess_exec("sleep", "2")
    p1_run = asyncio.create_task(p1.wait())
    p2_run = asyncio.create_task(p2.wait())
    pending = [p1_run, p2_run]
    while pending:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        if p1_run in done:
            print("p1 finished, with status: ", p1.returncode)
        if p2_run in done:
            print("p2 finished, with status: ", p2.returncode)

asyncio.get_event_loop().run_until_complete(example())

为避免重复执行p1和p2中的哪一个,通常会出现更复杂的将px_run映射到px的方式。
为避免这种情况,另一种选择是将任务包装在像下面的"wait_and_return_original"中,下一个示例还使用了更方便的"asyncio.as_completed"。
async def wait_and_return_original(proc: asyncio.subprocess):
    await proc.wait()
    return proc

async def example2():
    p1 = await asyncio.create_subprocess_exec("sleep", "1")
    p2 = await asyncio.create_subprocess_exec("sleep", "2")
    
    for p in asyncio.as_completed([wait_and_return_original(p) for p in [p1, p2]]):
        p_completed = await p   # NOTE: for-loop iteration variable doesn't decide which task is first completed until here!
        if p_completed is p1:
            print("p1 finished, with status: ", p1.returncode)
        if p_completed is p2:
            print("p2 finished, with status: ", p2.returncode)

asyncio.get_event_loop().run_until_complete(example2())

0

有两种方式可以做到这一点,如果你想要命令阻塞并且等待它完成后再继续程序,使用subprocess.call

a = subprocess.call('...')
b = subprocess.call('...')

我认为这不是你想要的。

如果你不想让它们停止整个程序,只需要在调用另一个之前检查其中一个是否完成,那么你应该使用 .communicate

a = subprocess.Popen(['...'])

b = subprocess.Popen(['...'])

....
    for child in {a, b}:
        try:
            result, err = child.communicate(timeout=5)

.communicate 是最优雅、简单和推荐的解决方案。


谢谢您的回答,但我更喜欢非活动等待(没有while:True)此外,我不希望与子进程通信。 - raphaelauv

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接