Python subprocess:当命令退出时回调

67

我目前正在使用subprocess.Popen(cmd, shell=TRUE)来启动一个程序。

我对Python还比较新,但感觉应该有一些API可以让我做类似以下操作的事情:

subprocess.Popen(cmd, shell=TRUE,  postexec_fn=function_to_call_on_exit)
我这样做是为了让 function_to_call_on_exit 能够根据 cmd 已经退出的情况做一些事情(例如,记录当前运行的外部进程数)。我认为我可以相对简单地将 subprocess 封装在一个类中,将线程与 Popen.wait() 方法结合起来,但由于我还没有在 Python 中做过线程操作,而且似乎这种情况很常见,有可能已经存在相应的 API,所以我想先尝试找到它。提前感谢您的帮助 :)

1
有人可以评论或研究一下asyncio subprocess模块吗?我认为这应该是正确的工具,但我自己从未使用过。 - Ben Mares
@BenMares 虽然这个问题已经有一段时间了,也许可以在这里添加一条关于asyncio的评论?尽管API在一些细微之处可能有所不同,但通常情况下,asyncio可以像下面的示例一样使用。它需要一个asyncio事件循环,通常至少需要一个顶层同步调用来启动事件循环或返回一些正在运行的循环。iPython和Spyder IDE可能提供了一些关于GUI和网络应用程序中asyncio的示例,可能还有一些类似popen的调用。标准库文档完整介绍了asyncio API及其标准库的相似之处。还有Trio。 - undefined
10个回答

76

你说得对 - 没有针对此的好的API。你的第二点也是正确的 - 使用线程轻松设计一个可以为您执行此操作的函数。

import threading
import subprocess

def popen_and_call(on_exit, popen_args):
    """
    Runs the given args in a subprocess.Popen, and then calls the function
    on_exit when the subprocess completes.
    on_exit is a callable object, and popen_args is a list/tuple of args that 
    would give to subprocess.Popen.
    """
    def run_in_thread(on_exit, popen_args):
        proc = subprocess.Popen(*popen_args)
        proc.wait()
        on_exit()
        return
    thread = threading.Thread(target=run_in_thread, args=(on_exit, popen_args))
    thread.start()
    # returns immediately after the thread starts
    return thread

即使在Python中,线程编程也是非常容易的,但请注意,如果on_exit()计算成本很高,您将需要使用multiprocessing将其放入单独的进程中(以便GIL不会拖慢程序)。实际上非常简单 - 您基本上只需要将所有调用threading.Thread替换为multiprocessing.Process,因为它们遵循(几乎)相同的API。


谢谢。这正是我要做的事情。 不幸的是,出现了一个问题,在简单的场景中我无法复制,但在我的实际程序中却可以:(如果我使用线程而不是多进程,则proc.wait()将不会返回,直到我对子进程进行其他操作。如果我使用多进程,它就能完美地工作。但是,使用多进程需要处理共享内存。我已经做到了,但我不确定是否满意于开销。 有什么想法,为什么在线程和进程中使用subprocess可能会表现不同(更改其使用方式而不做其他更改即可导致/解决问题)? - Who
@Who 很抱歉 - 我不知道为什么线程无法工作,或者为什么在这种情况下它会与多线程有所不同。这似乎非常奇怪。共享内存的开销是性能瓶颈吗,还是只是难看? - Daniel G
1
@DanielG 你可以考虑采用Phil的答案中提到的更改,以保持Popen接口。 - orodbhen

22

Python 3.2有一个名为concurrent.futures的模块(对于旧版Python < 3.2,可通过pip install futures获得):

pool = Pool(max_workers=1)
f = pool.submit(subprocess.call, "sleep 2; echo done", shell=True)
f.add_done_callback(callback)

回调函数将在调用f.add_done_callback()的同一进程中被调用。

完整程序

import logging
import subprocess
# to install run `pip install futures` on Python <3.2
from concurrent.futures import ThreadPoolExecutor as Pool

info = logging.getLogger(__name__).info

def callback(future):
    if future.exception() is not None:
        info("got exception: %s" % future.exception())
    else:
        info("process returned %d" % future.result())

def main():
    logging.basicConfig(
        level=logging.INFO,
        format=("%(relativeCreated)04d %(process)05d %(threadName)-10s "
                "%(levelname)-5s %(msg)s"))

    # wait for the process completion asynchronously
    info("begin waiting")
    pool = Pool(max_workers=1)
    f = pool.submit(subprocess.call, "sleep 2; echo done", shell=True)
    f.add_done_callback(callback)
    pool.shutdown(wait=False) # no .submit() calls after that point
    info("continue waiting asynchronously")

if __name__=="__main__":
    main()

输出

$ python . && python3 .
0013 05382 MainThread INFO  begin waiting
0021 05382 MainThread INFO  continue waiting asynchronously
done
2025 05382 Thread-1   INFO  process returned 0
0007 05402 MainThread INFO  begin waiting
0014 05402 MainThread INFO  continue waiting asynchronously
done
2018 05402 Thread-1   INFO  process returned 0

1
耶稣,这真是黄金啊。 - skamsie
@skamsie:除非您已经使用池来限制并发子进程的数量收集它们的输出,否则与单个线程相比,使用池可能会过度。这里有一种冗长的低级方法,它不使用线程,而是使用SIGCHLD信号QProcess.finished()显示了一个示例,说明如何高效地完成此操作,并提供简单的可移植API。 - jfs

16

我修改了Daniel G的答案,简单地将subprocess.Popen中的argskwargs直接传递而不是作为一个单独的元组/列表,因为我想要在subprocess.Popen中使用关键字参数。

在我的情况下,我有一个方法postExec(),我想在subprocess.Popen('exe', cwd=WORKING_DIR)之后运行它。

有了以下代码,它就变成了popenAndCall(postExec, 'exe', cwd=WORKING_DIR)

import threading
import subprocess

def popenAndCall(onExit, *popenArgs, **popenKWArgs):
    """
    Runs a subprocess.Popen, and then calls the function onExit when the
    subprocess completes.

    Use it exactly the way you'd normally use subprocess.Popen, except include a
    callable to execute as the first argument. onExit is a callable object, and
    *popenArgs and **popenKWArgs are simply passed up to subprocess.Popen.
    """
    def runInThread(onExit, popenArgs, popenKWArgs):
        proc = subprocess.Popen(*popenArgs, **popenKWArgs)
        proc.wait()
        onExit()
        return

    thread = threading.Thread(target=runInThread,
                              args=(onExit, popenArgs, popenKWArgs))
    thread.start()

    return thread # returns immediately after the thread starts

7
我曾经遇到过同样的问题,使用了 multiprocessing.Pool 解决了它。其中有两个技巧:
  1. 将池的大小设置为1
  2. 在长度为1的可迭代对象中传递可迭代参数
结果是一个函数在完成时执行回调。
def sub(arg):
    print arg             #prints [1,2,3,4,5]
    return "hello"

def cb(arg):
    print arg             # prints "hello"

pool = multiprocessing.Pool(1)
rval = pool.map_async(sub,([[1,2,3,4,5]]),callback =cb)
(do stuff) 
pool.close()

在我的情况下,我希望调用也是非阻塞的。这样做效果非常好。


我需要 pool.starmap(...) 或者 .starmap_async(...) - colllin

2

我受到Daniel G.答案的启发,实现了一个非常简单的用例 - 在我的工作中,我经常需要使用不同的参数重复调用相同的(外部)进程。我曾经黑客式地确定每个特定调用何时完成,但现在我有了更清晰的方式来发出回调。

我喜欢这个实现,因为它非常简单,但它允许我对多个处理器发出异步调用(请注意我使用multiprocessing而不是threading)并在完成时接收通知。

我测试了示例程序,效果很好。请随意编辑并提供反馈。

import multiprocessing
import subprocess

class Process(object):
    """This class spawns a subprocess asynchronously and calls a
    `callback` upon completion; it is not meant to be instantiated
    directly (derived classes are called instead)"""
    def __call__(self, *args):
    # store the arguments for later retrieval
    self.args = args
    # define the target function to be called by
    # `multiprocessing.Process`
    def target():
        cmd = [self.command] + [str(arg) for arg in self.args]
        process = subprocess.Popen(cmd)
        # the `multiprocessing.Process` process will wait until
        # the call to the `subprocess.Popen` object is completed
        process.wait()
        # upon completion, call `callback`
        return self.callback()
    mp_process = multiprocessing.Process(target=target)
    # this call issues the call to `target`, but returns immediately
    mp_process.start()
    return mp_process

if __name__ == "__main__":

    def squeal(who):
    """this serves as the callback function; its argument is the
    instance of a subclass of Process making the call"""
    print "finished %s calling %s with arguments %s" % (
        who.__class__.__name__, who.command, who.args)

    class Sleeper(Process):
    """Sample implementation of an asynchronous process - define
    the command name (available in the system path) and a callback
    function (previously defined)"""
    command = "./sleeper"
    callback = squeal

    # create an instance to Sleeper - this is the Process object that
    # can be called repeatedly in an asynchronous manner
    sleeper_run = Sleeper()

    # spawn three sleeper runs with different arguments
    sleeper_run(5)
    sleeper_run(2)
    sleeper_run(1)

    # the user should see the following message immediately (even
    # though the Sleeper calls are not done yet)
    print "program continued"

示例输出:

program continued
finished Sleeper calling ./sleeper with arguments (1,)
finished Sleeper calling ./sleeper with arguments (2,)
finished Sleeper calling ./sleeper with arguments (5,)

以下是sleeper.c的源代码,这是我示例的“耗时”外部进程。
#include<stdlib.h>
#include<unistd.h>

int main(int argc, char *argv[]){
  unsigned int t = atoi(argv[1]);
  sleep(t);
  return EXIT_SUCCESS;
}

编译为:

gcc -o sleeper sleeper.c

谢谢!缩进有些问题,但是在那之后,这正是我需要的,在XMLRPC服务器中启动异步进程,当客户端发出“运行”命令时。 - tahoar

2

感谢大家指引我正确的方向。我从这里找到了相关的内容并制作了一个类,并添加了一个停止函数来终止该进程:

class popenplus():
  def __init__(self, onExit, *popenArgs, **popenKWArgs):
    thread = Thread(target=self.runInThread, args=(onExit, popenArgs, popenKWArgs))
    thread.start()

  def runInThread(self, onExit, popenArgs, popenKWArgs):
    self.proc = Popen(*popenArgs, **popenKWArgs)
    self.proc.wait()
    self.proc = None
    onExit()

  def stop(self):
    if self.proc:
      self.proc.kill()

2
大多数对这个问题的回答都建议为每个进程启动一个线程,只是为了等待那个回调。我觉得这样做是不必要的浪费:一个线程应该足够处理通过这种方式创建的所有进程的回调。
另一个回答建议使用信号,但这会导致一个竞态条件,即在前一次调用完成之前,信号处理程序可能会再次被调用。在Linux上,signalfd(2)可以解决这个问题,但它在Python中不受支持(虽然可以通过ctypes轻松添加)。
Python中asyncio使用的另一种解决方案是使用signal.set_wakeup_fd。然而,还有另一种基于操作系统将在进程退出时关闭所有打开的文件描述符的事实的解决方案。
import os
import select
import subprocess
import threading
import weakref


def _close_and_join(fd, thread):
    os.close(fd)
    thread.join()


def _run_poll_callbacks(quitfd, poll, callbacks):
    poll.register(quitfd, select.POLLHUP)
    while True:
        for fd, event in poll.poll(1000.0):
            poll.unregister(fd)
            if fd == quitfd:
                return
            callback = callbacks.pop(fd)
            if callback is not None:
                callback()


class PollProcs:
    def __init__(self):
        self.poll = select.poll()
        self.callbacks = {}
        self.closed = False

        r, w = os.pipe()
        self.thread = threading.Thread(
            target=_run_poll_callbacks, args=(r, self.poll, self.callbacks)
        )
        self.thread.start()
        self.finalizer = weakref.finalize(self, _close_and_join, w, self.thread)

    def run(self, cmd, callback=None):
        if self.closed:
            return

        r, w = os.pipe()
        self.callbacks[r] = callback
        self.poll.register(r, select.POLLHUP)
        popen = subprocess.Popen(cmd, pass_fds=(w,))
        os.close(w)
        print("running", " ".join(cmd), "as", popen.pid)
        return popen


def main():
    procs = PollProcs()

    for i in range(3, 0, -1):
        procs.run(["sleep", str(i)], callback=lambda i=i: print(f"sleep {i} done?"))

    import time

    print("Waiting...")
    time.sleep(3)


if __name__ == "__main__":
    main()

如果不需要支持MacOS,那么选择select.epoll可能是一个更好的选择,因为它允许更新正在进行的轮询。

2
自 Python 3.2 版本开始,在 concurrent.futures 模块中也提供了 ProcesPoolExecutor(https://docs.python.org/3/library/concurrent.futures.html)。使用方法与上面提到的 ThreadPoolExecutor 相同。可以通过 executor.add_done_callback() 方法添加一个退出回调函数。

2
在POSIX系统中,当子进程退出时,父进程会收到一个SIGCHLD信号。要在子进程命令退出时运行回调函数,请在父进程中处理SIGCHLD信号。可以使用如下示例代码:
import signal
import subprocess

def sigchld_handler(signum, frame):
    # This is run when the child exits.
    # Do something here ...
    pass

signal.signal(signal.SIGCHLD, sigchld_handler)

process = subprocess.Popen('mycmd', shell=TRUE)

请注意,这在Windows上不起作用。

0
据我所知,至少在subprocess模块中没有这样的API。你需要自己编写一些代码,可能需要使用线程来实现。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接