终止Python多进程池

23

我正在运行一个使用多进程模块来生成一些工作线程的Python程序。使用 Pool.map 函数对文件列表进行处理。

在某个时刻,我想要停止一切并终止脚本的执行。

通常情况下,从命令行使用 Ctrl+C 可以实现这一点。但是,在这种情况下,我认为这只会中断其中的一个工作进程,并生成一个新的工作进程。

所以,我最终使用 ps aux | grep -i python 命令并在相关的进程ID上使用 kill -9 命令。

有没有更好的方式可以让中断信号将所有内容暂停?


2
你的其他线程在做什么?你不能在它们中设置一个标志并将其设置为false,然后退出执行吗?问题是获取“Ctrl+C”行为 - 你可以让一个无用的Tk窗口弹出,并将一个“停止”函数绑定到它上面,以便在所有线程(或全局线程)上设置一个kill标志。你甚至可以绑定一个函数,使用Popen执行你的终端kill命令,如果你厌烦了去终端输入。 - en_Knight
4个回答

26

SIGQUIT(Ctrl + \)会在Python 2.x下杀死所有进程。

您也可以升级到Python 3.x版本,此行为(仅子进程接收信号)似乎已被修复。


2
Unix 继续让我惊喜和愉悦。尽管我已经进行了多年的探索使用,但我仍然不知道这个命令。谢谢。 - Richard
我认为问题的重点不在于使用哪个信号,而是发送类似 SIGTERM 这样的信号将会杀死主进程但不会影响子进程,正如 @dano 在下面所讨论的那样。 - user1071847
我遇到了同样的问题,正在Windows上运行Python(3.9),所以来到了这个问题。对于那些处于同样困境的人,适当的信号是 CTRL + BREAK(https://learn.microsoft.com/en-us/windows/console/ctrl-c-and-ctrl-break-signals)。 - Aerinmund Fagelson

4

很遗憾,Python 2.x 真的没有一个好的解决方案来解决这个问题。我知道的最好的解决方法是使用 pool.map_async(...).get(timeout=<large number>) 而不是使用 pool.map。问题在于 pool.map 调用了 threading.Condition.wait(),由于某种原因,在 Python 2.x 中无法通过 Ctrl+C 中断(在 Python 3 中可以)。当你使用 map_async() 时,它会调用 threading.Condition.wait(timeout=<large number>),最终执行繁忙等待循环,这个过程可以被 Ctrl+C 中断。

亲自试试吧:

c = threading.Condition()
try:
    c.acquire()
    c.wait()  # You won't be able to interrupt this
except KeyboardInterrupt:
    print("Caught it")

c = threading.Condition()
try:
    c.acquire()
    c.wait(timeout=100)  # You CAN interrupt this
except KeyboardInterrupt:
    print("Caught it")

因此,要使您的 map 调用可中断,请执行以下操作:
if __name__ == "__main__":
    p = multiprocessing.Pool()
    try:
        p.map_async(func, iterable).get(timeout=10000000)
    except KeyboardInterrupt:
        print("Caught it")
        # Optionally try to gracefully shut down the worker processes here.
        p.close()
        # DON'T join the pool. You'll end up hanging.

需要注意的是,正如phihag所指出的那样,这个问题已经在Python 3.4中得到了解决(以及可能在3.x之前的版本中也得到了解决)。


3
有几种方法。第一种方法是使用Threading中的方法将线程标记为守护进程。
在代码中加入如下语句:
myThread.setDaemon(true)

在多进程编程中,
myThread.daemon = True

所有被标记为守护进程的线程将随主线程一起终止。这不是正确的做法,因为它不允许线程清理。

下一种方式是通过try-catch监听KeyboardInterrupt,然后像以下那样使用.join()来加入线程。

try:
    myThread = MyThread()
except KeyboardInterrupt:
    myThread.join()

如果您的线程处于循环中,可以使用布尔条件,并将其设置为false。当条件为false时,执行清理操作。
class MyThread(Threading.thread):
    def __init__(self):
        self.alive=True
    def run(self):
        while self.alive:
            #do stuff
        #cleanup goes here, outside the loop
try:
    myThread = MyThread()
except KeyboardInterrupt:
    myThread.alive = False
    myThread.join()

1
multiprocessing.Pool 中的进程始终是守护进程,因此在这里没有帮助。在父进程中尝试捕获 KeyboardInterrupt 也无济于事,因为子进程最终会收到 KeyboardInterrupt 而不是父进程。 - dano
1
另外,在您的第三个示例中,从父进程设置类的属性不会对子进程产生任何影响,因为它们将分别获得属性的副本。 - dano
好的。我一直使用 threading.Thread,所以我想它不会继承过来。我只是有点假设它们会继承。当你做出假设时就会发生什么... - ollien

0

我发现在这种情况下使用Python信号库非常有效。当您初始化池时,可以将信号处理程序传递给每个线程,以设置主线程收到键盘中断时的默认行为。

如果您真的只想让所有东西都停止,可以在主线程中捕获键盘中断异常,并调用pool.terminate()。


如果你真的想让所有东西都停止运行,可以在主线程中捕获键盘中断异常,并调用pool.terminate()。但是这样做是不起作用的。 - user1071847

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接