使用Python multiprocessing在子进程中创建子进程失败

13

在尝试在Python中创建嵌套子进程时,我观察到了这种行为。下面是父进程程序parent_process.py

import multiprocessing
import child_process

pool = multiprocessing.Pool(processes=4)
for i in range(4):
        pool.apply_async(child_process.run, ())
pool.close()
pool.join()
父程序在下面的子程序child_process.py中调用“run”函数:
import multiprocessing

def run():
        pool = multiprocessing.Pool(processes=4)
        print 'TEST!'
        pool.close()
        pool.join()

当我运行父程序时,没有任何输出并且程序很快退出。然而,如果将print 'TEST!'移动到嵌套的子进程创建之前的一行,'TEST!'会被打印4次。

由于子进程中的错误不会在屏幕上打印出来,这似乎表明当一个子进程创建它自己的嵌套子进程时,程序会崩溃。

有人能解释一下背后发生了什么吗?谢谢!


我知道嵌套有一个限制,会引发异常,但在这种情况下,您还远未达到该限制。我认为问题可能出现在其他地方,也许是在“池”机制中... - CoMartel
你想通过这个来改善什么?通过启动一堆Pool比启动一个更大的Pool不能使你更快。 - CoMartel
@HarryPotfleur 的意图是让每个子进程管理自己的一组进程,因此程序在逻辑上看起来更整洁。如果我有 16 个核心,那么 4x4 嵌套的进程可以同时运行。我在下面提供了一个解决方法。 - Da Kuang
在这种情况下,您可以生成4个multiprocessing.Process,每个进程都启动一个Pool。如果需要,我可以帮助您编写代码。 - CoMartel
3个回答

8
根据multiprocessing的文档,守护进程无法生成子进程。 multiprocessing.Pool使用守护进程确保程序退出时不会泄漏。

6

正如noxdafox所说,multiprocessing.Pool使用守护进程。我找到了一个简单的解决方法,改用multiprocess.Process

父程序:

import multiprocessing
import child_process

processes = [None] * 4
for i in range(4):
    processes[i] = multiprocessing.Process(target=child_process.run, args=(i,))
    processes[i].start()
for i in range(4):
    processes[i].join()

子程序(名称为child_process.py):

import multiprocessing

def test(info):
    print 'TEST', info[0], info[1]

def run(proc_id):
    pool = multiprocessing.Pool(processes=4)
    pool.map(test, [(proc_id, i) for i in range(4)])
    pool.close()
    pool.join()

输出结果是16行TEST代码:
TEST 0 0
TEST 0 1
TEST 0 3
TEST 0 2
TEST 2 0
TEST 2 1
TEST 2 2
TEST 2 3
TEST 3 0
TEST 3 1
TEST 3 3
TEST 3 2
TEST 1 0
TEST 1 1
TEST 1 2
TEST 1 3

1
我没有足够的声望来发表评论,但是由于Python版本决定了运行分层多进程的选项(例如2015年的一篇文章),所以我想分享我的经验。Da Kuang的上述解决方案对我有用,使用Anaconda 3运行Python 3.7.1。

我对child_process.py进行了小修改,使其运行一段时间的CPU,以便我可以检查系统监视器,验证16个同时运行的进程。

import multiprocessing

def test(info):
    print('TEST', info[0], info[1])
    aa=[1]*100000
    a=[1 for i in aa if all([ii<1 for ii in aa])]
    print('exiting')

def run(proc_id):
    pool = multiprocessing.Pool(processes=4)
    pool.map(test, [(proc_id, i) for i in range(4)])
    pool.close()
    pool.join()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接