Python,Popen和select - 等待进程终止或超时

23

我使用以下命令来运行一个子进程:

  p = subprocess.Popen("subprocess", 
                       stdout=subprocess.PIPE, 
                       stderr=subprocess.PIPE, 
                       stdin=subprocess.PIPE)

这个子进程可能会立即以stderr错误退出,或继续运行。我想检测这两种情况中的任意一种 - 后者需要等待几秒钟。

我尝试了以下方法:

  SECONDS_TO_WAIT = 10
  select.select([], 
                [p.stdout, p.stderr], 
                [p.stdout, p.stderr],
                SECONDS_TO_WAIT)

但它只返回:

  ([],[],[])

在任何一种情况下,我能做什么?


2
注意:如果子进程产生足够的输出,它可能会死锁。如果使用PIPE,您需要消耗stdout/stderr。 - jfs
7个回答

15

您是否尝试过使用Popen.Poll()方法。您可以这样做:

p = subprocess.Popen("subprocess", 
                   stdout=subprocess.PIPE, 
                   stderr=subprocess.PIPE, 
                   stdin=subprocess.PIPE)

time.sleep(SECONDS_TO_WAIT)
retcode = p.poll()
if retcode is not None:
   # process has terminated

这将导致您始终等待10秒钟,但如果失败情况很少,这个时间会分摊到所有成功的情况中。


编辑:

这样如何:

t_nought = time.time()
seconds_passed = 0

while(p.poll() is not None and seconds_passed < 10):
    seconds_passed = time.time() - t_nought

if seconds_passed >= 10:
   #TIMED OUT

虽然这种方法有点繁忙等待的不美观之处,但我认为它实现了你想要的功能。

此外,再次查看select调用文档后,我认为你可能需要将其更改为以下内容:

SECONDS_TO_WAIT = 10
  select.select([p.stderr], 
                [], 
                [p.stdout, p.stderr],
                SECONDS_TO_WAIT)

由于通常希望从stderr中读取内容,因此需要知道何时有内容可供读取(即失败情况)。

希望对你有所帮助。


谢谢您的回复,但不幸的是我不能使用那种方法,因为失败情况是最常见的。程序预计会有大约600个失败(每次调整参数),然后在最后获得一个成功。目前,我正在使用commands.getstatusoutput,但成功时它会挂起。 - Brian Leahy
修改了我的答案以考虑到您的特定用例。 - grieve
如果 if retcode: 失败,那么子进程已经成功完成,即 retcode == 0。你可以使用 if retcode is not None: - jfs
@J.F.Sebastian:发现得好!已经编辑修复了。 - grieve

7

我想分享一个解决方案。当你的进程需要或不需要超时时,这个方案是有效的,但它使用了一个半忙循环。

def runCmd(cmd, timeout=None):
    '''
    Will execute a command, read the output and return it back.

    @param cmd: command to execute
    @param timeout: process timeout in seconds
    @return: a tuple of three: first stdout, then stderr, then exit code
    @raise OSError: on missing command or if a timeout was reached
    '''

    ph_out = None # process output
    ph_err = None # stderr
    ph_ret = None # return code

    p = subprocess.Popen(cmd, shell=True,
                         stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)
    # if timeout is not set wait for process to complete
    if not timeout:
        ph_ret = p.wait()
    else:
        fin_time = time.time() + timeout
        while p.poll() == None and fin_time > time.time():
            time.sleep(1)

        # if timeout reached, raise an exception
        if fin_time < time.time():

            # starting 2.6 subprocess has a kill() method which is preferable
            # p.kill()
            os.kill(p.pid, signal.SIGKILL)
            raise OSError("Process timeout has been reached")

        ph_ret = p.returncode


    ph_out, ph_err = p.communicate()

    return (ph_out, ph_err, ph_ret)

3

这里有一个不错的例子:

from threading import Timer
from subprocess import Popen, PIPE

proc = Popen("ping 127.0.0.1", shell=True)
t = Timer(60, proc.kill)
t.start()
proc.wait()

2

Python 3.3

import subprocess as sp

try:
    sp.check_call(["/subprocess"], timeout=10,
                  stdin=sp.DEVNULL, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
except sp.TimeoutError:
    # timeout (the subprocess is killed at this point)
except sp.CalledProcessError:
    # subprocess failed before timeout
else:
    # subprocess ended successfully before timeout

请参见TimeoutExpired文档


2
使用select和睡眠并没有太多意义。 select(或任何内核轮询机制)本质上用于异步编程,但您的示例是同步的。因此,要么重新编写代码以使用常规阻塞方式,要么考虑使用Twisted:
from twisted.internet.utils import getProcessOutputAndValue
from twisted.internet import reactor    

def stop(r):
    reactor.stop()
def eb(reason):
    reason.printTraceback()
def cb(result):
    stdout, stderr, exitcode = result
    # do something
getProcessOutputAndValue('/bin/someproc', []
    ).addCallback(cb).addErrback(eb).addBoth(stop)
reactor.run()

顺便提一下,使用Twisted编写自己的ProcessProtocol是一种更安全的方法:

http://twistedmatrix.com/projects/core/documentation/howto/process.html


在使用此示例中的代码和使用完整的ProcessProtocol之间有什么区别? - Wolkenarchitekt

1
如果像您在上面的评论中所说的那样,您只是每次微调输出并重新运行命令,那么类似以下的东西是否可行?
from threading import Timer
import subprocess

WAIT_TIME = 10.0

def check_cmd(cmd):
    p = subprocess.Popen(cmd,
        stdout=subprocess.PIPE, 
            stderr=subprocess.PIPE)
    def _check():
        if p.poll()!=0:
            print cmd+" did not quit within the given time period."

    # check whether the given process has exited WAIT_TIME
    # seconds from now
    Timer(WAIT_TIME, _check).start()

check_cmd('echo')
check_cmd('python')

上述代码运行后输出:
python did not quit within the given time period.

我能想到上述代码唯一的缺点是在不断运行check_cmd时可能会出现重叠进程。

0
这是对Evan答案的改述,但它考虑了以下内容:
  1. 明确取消计时器对象:如果计时器间隔很长,而进程将通过其“自身意愿”退出,则可能会挂起您的脚本:(
  2. 计时器方法中存在固有竞争(计时器尝试在进程死亡后“立即”杀死进程,在Windows上会引发异常)。

      DEVNULL = open(os.devnull, "wb")
      process = Popen("c:/myExe.exe", stdout=DEVNULL) # 不需要stdout
    
      def kill_process():
      """ 杀死进程的帮助程序"""
      try:
         process.kill()
       except OSError:
         pass  # 忽略错误
    
      timer = Timer(timeout_in_sec, kill_process)
      timer.start()
    
      process.wait()
      timer.cancel()
    

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接