处理多进程池中工作进程死亡问题

8

我有一个简单的服务器:

from multiprocessing import Pool, TimeoutError
import time
import os


if __name__ == '__main__':
    # start worker processes
    pool = Pool(processes=1)

    while True:
        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ())  # runs in *only* one process
        try:
            print(res.get(timeout=1))             # prints the PID of that process
        except TimeoutError:
            print('worker timed out')

        time.sleep(5)

    pool.close()
    print("Now the pool is closed and no longer available")
    pool.join()
    print("Done")

如果我运行这个,我会得到类似于这样的东西:
47292
47292

当服务器正在运行时,我执行了 kill 47292 命令。这时会启动一个新的工作进程,但服务器的输出内容为:

47292
47292
worker timed out
worker timed out
worker timed out

线程池仍在尝试向旧的工作进程发送请求。

我已经在服务器和工作进程中进行了一些信号捕获方面的工作,虽然能够获得稍微更好的行为,但服务器似乎在关闭时仍在等待死亡的子进程(即 pool.join() 永远不会结束),而这种情况发生在工作进程被杀死后。

如何正确处理工作进程的死亡?

如果没有任何一个工作进程死亡,从服务器进程优雅地关闭工作进程似乎是有效的。

(在Python 3.4.4上运行,但如果有帮助的话我可以升级。)

更新: 有趣的是,如果使用 processes=2 创建线程池,并且您杀死一个工作进程,等待几秒钟再杀死另一个工作进程,则此工作超时问题不会发生。但是,如果您快速连续杀死两个工作进程,则“工作超时”问题将再次出现。

也许相关的是当问题发生时,杀死服务器进程将使工作进程继续运行。

1个回答

4
这种行为来自于的设计。当你杀死一个工作进程时,你可能会杀死持有锁的进程。当这个进程在持有锁的时候被杀死,没有其他进程能够再读取了,这将破坏,因为它无法再与其工作进程通信。
因此,实际上没有办法杀死一个工作进程,并确保你的在之后仍然正常工作,因为你可能会陷入死锁状态。 不处理工作进程的死亡。你可以尝试使用代替(具有稍微不同的API),它默认处理进程失败。当一个进程在中死亡时,整个执行器都会关闭,并返回一个错误。
请注意,此实现中还存在其他死锁问题,应在 loky 中修复。(免责声明:我是这个库的维护者)。此外,loky 允许您使用 ReusablePoolExecutor 和方法 _resize 调整现有的 executor 的大小。如果您感兴趣,让我知道,我可以提供一些帮助来开始使用此包。(我意识到我们的文档还需要一些工作...0_0)

我的使用案例是一个长时间运行的服务器,其中父进程从外部队列中读取作业,并将每个作业交给子进程执行(每个作业一个子进程)。显然,我希望能够处理子进程的死亡。也许我应该使用Process并自己解决问题,但我想避免尝试解决一些更高级别对象中已经解决的相同问题。如果ProcessPoolExecutor听起来是我问题的最佳解决方案,我很乐意获取有关如何入门的更多信息。谢谢! - ivo
你想处理孩子进程死亡的问题吗?如果您想要处理由外部原因杀死的子进程的终止,这是非常困难的,因为您可能会导致死锁。 如果您想要处理工人的终止,可以使用带有返回每个任务的进程的“池”。然后,“池”将为提交的每个新任务生成一个新进程。 如果您想要更精确的答案,请明确您的约束条件。从您的问题状态来看,您想能够使用外部“kill”来杀死“Pool.worker”,但根据我的回答,这是不可能的。 - Thomas Moreau
就此做个总结:你在问题中描述的破碎的Pool是由死锁导致的,正如答案所述。如果你想处理一个工作进程的死亡,你需要明确死因:谋杀还是意外? :) 根据这些精确的情况,你可以使用不同的设计。关于工作进程的非python死亡的主要问题在于无法保证同步原语(Lock)不会陷入不可恢复的状态。 - Thomas Moreau
感谢您深入的回答。死亡将是意外的 - 一些工人调用复杂的第三方C++代码,可能会导致段错误。在这种情况下,我可以捕获信号并让工作线程干净地关闭(就池而言)。这可能是一个罕见的情况,所以我可以简单地添加一个监视任务来验证池仍在工作且没有死锁。 - ivo
1
以下是一些死锁的示例(带有腌制错误)http://loky.readthedocs.io/en/stable/auto_examples/index.html。此外,请注意,`futures`与`concurrent.futures`相比非常不同。例如,它无法检测到已停止工作的工作进程。如果您想要将`concurrent.futures`可靠地回退到python2.7,则应使用`loky.ProcessPoolExecutor`。 - Thomas Moreau
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接