Python进程池非守护进程?

146

是否可以创建一个非守护进程的 Python Pool? 我想要一个池能够调用另一个池内的函数。

我之所以希望如此,是因为守护进程无法创建进程。 具体而言,这会导致以下错误:

AssertionError: daemonic processes are not allowed to have children
例如,考虑这样一种情况: function_a 有一个池,其中运行 function_b,而 function_b 又有一个池,其中运行 function_c。由于 function_b 在守护进程中运行,而守护进程无法创建进程,因此此函数链将失败。

据我所知,不,这是不可能的。池中的所有工作程序都已被守护程序化,并且不可能进行__依赖注入__。顺便说一句,我不明白你问题的第二部分“我想让一个池能够调用另一个池中的函数”以及如何干扰工作程序被守护程序化的事实。 - mouad
8
因为如果函数a有一个运行函数b的池,而函数b有一个运行函数c的池,那么b中存在一个问题,即它正在一个守护进程中运行,而守护进程无法创建进程。AssertionError: daemonic processes are not allowed to have children - Max
1
不要使用from multiprocessing import Pool,而是使用from concurrent.futures import ProcessPoolExecutor as Pool进行导入。 - Vishal Gupta
10个回答

151
multiprocessing.pool.Pool 类在其 __init__ 方法中创建工作进程,使它们成为守护进程并启动它们,而且在启动之前无法将它们的 daemon 属性重新设置为 False(在启动后也不允许)。但是,您可以创建自己的 multiprocesing.pool.Pool 子类(multiprocessing.Pool 只是一个包装函数),并替换您自己的 multiprocessing.Process 子类,该子类始终是非守护进程,用于工作进程。

以下是如何执行此操作的完整示例。重要部分是顶部的两个类 NoDaemonProcessMyPool,以及在结尾处对您的 MyPool 实例调用 pool.close()pool.join()

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()

1
这个问题应该在多进程模块中得到解决(应该提供一个非守护进程的选项)。有人知道是谁在维护它吗? - Mike Vella
1
谢谢!在Windows上,您还需要调用 multiprocessing.freeze_support() - frmdstryr
4
好的。如果有人在使用此代码时出现内存泄漏,请尝试使用 "with closing (MyPool(processes = num_cpu))as pool:" 来正确处理进程池。 - Chris Lucian
54
使用MyPool相比默认的Pool,有哪些不足之处?换句话说,为了启动子进程的灵活性,我需要付出什么代价?(如果没有代价,标准的Pool应该会使用非守护进程) - max
4
是的,不幸的是这是真的。在Python 3.6中,Pool类已经进行了广泛重构,因此Process不再是一个简单的属性,而是一个方法,该方法返回从上下文获取的进程实例。我尝试覆盖此方法以返回一个NoDaemonPool实例,但使用该池时会导致异常AssertionError: daemonic processes are not allowed to have children - Chris Arndt
显示剩余15条评论

57

我需要在Python 3.7中使用非守护进程池,最终改编了已被接受的答案中发布的代码。以下是创建非守护进程池的代码片段:

import multiprocessing.pool

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NestablePool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(NestablePool, self).__init__(*args, **kwargs)

由于当前的 multiprocessing 实现已经进行了广泛的重构,以基于上下文,因此我们需要提供一个 NoDaemonContext 类,其中包含我们的 NoDaemonProcess 作为属性。然后,NestablePool 将使用该上下文而不是默认上下文。

话虽如此,我应该警告说,这种方法至少有两个注意事项:

  1. 它仍然依赖于 multiprocessing 包的实现细节,因此可能随时会出现问题。
  2. multiprocessing 之所以让非守护进程使用起来很困难,有很多有效的原因,其中许多在这里解释。在我看来最具有说服力的原因是:

允许子线程使用 subprocess 创建自己的子级别存在风险,如果父线程或子线程在 subprocess 完成并返回之前终止,则会创建一小队僵尸 '孙子'。


2
关于警告:我的用例是并行化任务,但是孙子进程将信息返回给它们的父进程,然后在执行一些必要的本地处理后,父进程再将信息返回给它们的父进程。因此,每个级别/分支都需要明确等待其所有叶子节点。如果您需要显式等待生成的进程完成,那么这个警告是否仍然适用? - A_A
你能否麻烦添加如何使用这个而不是multiprocessing.pool? - Radio Controlled
现在您可以交替使用multiprocessing.Pool和NestablePool。 - Radio Controlled

37

从Python 3.8开始,concurrent.futures.ProcessPoolExecutor就没有这个限制了。它可以毫无问题地拥有一个嵌套的进程池:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":
    main()

上述演示代码已经测试过 Python 3.8 版本。

ProcessPoolExecutor 的一个限制是它没有 maxtasksperchild。如果您需要此功能,请考虑使用 Massimiliano 的答案

鸣谢:jfs 的回答


4
现在,这显然是最佳解决方案,因为它只需要进行最少的更改。 - dreamflasher
2
完美运行!另外,使用ProcessPoolExecutor.Pool内部的子进程multiprocessing.Pool也是可行的! - raphael
1
很遗憾,这对我不起作用,仍然会出现“daemonic processes are not allowed to have children”的错误。 - Roy Shilkrot
@RoyShilkrot 你具体使用的是哪个版本的Python? - Asclepius
如果你的脚本在celery中,这个方法是行不通的。使用celery==4.3.0python==3.7 - Shift 'n Tab
显示剩余4条评论

29

multiprocessing 模块提供了一个使用进程或线程池的良好接口。根据您当前的用例,您可能需要考虑使用 multiprocessing.pool.ThreadPool 作为外部池,这将产生线程(允许从中生成进程)而不是进程。

它可能会受到 GIL 的限制,但在我的特定情况下(我测试了两种方式),从外部 Pool 创建进程的启动时间,如此处所述,远远超过了使用ThreadPool的解决方案。


非常容易将 Processes 更换为 Threads。了解更多关于如何使用 ThreadPool 解决方案的信息,请参见此处此处


谢谢 - 这对我很有帮助 - 在这里使用线程非常好(可以生成实际表现良好的进程) - trance_dude
1
对于寻找实际解决方案并可能适用于他们情况的人来说,这是一个不错的选择。 - abanana
1
选择“进程池”的用户可能是CPU密集型和/或需要可取消的任务,因此线程不是一个选项。这并没有真正回答问题。 - wim

10

对一些Python版本而言,将标准池替换为自定义池可能会导致错误:AssertionError: group argument must be None for now

在这里我找到了一个可以帮助解决问题的解决方案:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, val):
        pass


class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc

4
我遇到的问题是在尝试在模块之间导入全局变量时,导致ProcessPool()行被多次评估。 globals.py
from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """     
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children
     
    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

然后可以安全地从代码中的其他位置导入。
from globals import Globals
Globals().shared_manager      
Globals().shared_process_pool
Globals().shared_thread_pool  
Globals().shared_lock         

我已经在这里写了一个更加扩展的包装类,围绕着pathos.multiprocessing

另外,如果您的用例只需要异步多进程映射作为性能优化,那么joblib将在幕后管理所有进程池,并允许使用非常简单的语法:

squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )

4

我曾经看到有些人使用 celerymultiprocessing 分支,名为 billiard(multiprocessing 池扩展),该分支允许守护进程生成子进程。 解决方法是简单地将 multiprocessing 模块替换为:

import billiard as multiprocessing

2

以下是如何启动一个进程池的方法,即使您已经在一个守护进程中。此方法已在Python 3.8.5中进行了测试。

首先,定义Undaemonize上下文管理器,它可以暂时删除当前进程的守护状态。

class Undaemonize(object):
    '''Context Manager to resolve AssertionError: daemonic processes are not allowed to have children
    
    Tested in python 3.8.5'''
    def __init__(self):
        self.p = multiprocessing.process.current_process()
        if 'daemon' in self.p._config:
            self.daemon_status_set = True
        else:
            self.daemon_status_set = False
        self.daemon_status_value = self.p._config.get('daemon')
    def __enter__(self):
        if self.daemon_status_set:
            del self.p._config['daemon']
    def __exit__(self, type, value, traceback):
        if self.daemon_status_set:
            self.p._config['daemon'] = self.daemon_status_value

现在,您甚至可以从守护进程中启动池,方法如下:

with Undaemonize():
    pool = multiprocessing.Pool(1)
pool.map(... # you can do something with the pool outside of the context manager 

虽然其他方法的目标是在第一次创建池时不使用守护程序,但这种方法允许您即使在一个守护进程中也可以启动池。


1
current_process()在最新的Python版本中已经被移动到multiprocessing.current_process() - 在Python 3.10.12中进行了测试。 - undefined
1
这是唯一的解决方案,当我们被迫在使用多进程池(例如,在Pytorch数据加载器中)生成的工作进程内生成进程时,它才能正常工作。 - undefined

1
自Python 3.7版本起,我们可以创建非守护进程ProcessPoolExecutor。
在使用多进程时,使用if __name__ == "__main__":是必要的。
from concurrent.futures import ProcessPoolExecutor as Pool

num_pool = 10
    
def main_pool(num):
    print(num)
    strings_write = (f'{num}-{i}' for i in range(num))
    with Pool(num) as subp:
        subp.map(sub_pool,strings_write)
    return None


def sub_pool(x):
    print(f'{x}')
    return None


if __name__ == "__main__":
    with Pool(num_pool) as p:
        p.map(main_pool,list(range(1,num_pool+1)))

如果我更改您的代码并使其从main_poolsub_pool返回值,则它将无法正常工作。result = p.map(main_pool,list(range(1,num_pool+1)))立即返回生成器而不是最终结果。 - Volatil3

1
这提供了一种解决方案,用于当错误看似是误报时。正如詹姆斯所指出的那样,这可能发生在来自一个守护进程的无意间导入

例如,如果您有以下简单代码,则可以不经意地从工作进程中导入WORKER_POOL,从而导致错误。
import multiprocessing

WORKER_POOL = multiprocessing.Pool()

一个简单但可靠的解决方法是:
import multiprocessing
import multiprocessing.pool


class MyClass:

    @property
    def worker_pool(self) -> multiprocessing.pool.Pool:
        # Ref: https://dev59.com/EGw05IYBdhLWcg3w3lol#63984747/
        try:
            return self._worker_pool  # type: ignore
        except AttributeError:
            # pylint: disable=protected-access
            self.__class__._worker_pool = multiprocessing.Pool()  # type: ignore
            return self.__class__._worker_pool  # type: ignore
            # pylint: enable=protected-access

在上述解决方法中,MyClass.worker_pool 可以被使用而不会出错。如果你认为这种方法可以改进,请让我知道。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接