我遇到的问题是在尝试在模块之间导入全局变量时,导致ProcessPool()行被多次评估。
globals.py
from processing import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading import ThreadPool
class SingletonMeta(type):
def __new__(cls, name, bases, dict):
dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
return super(SingletonMeta, cls).__new__(cls, name, bases, dict)
def __init__(cls, name, bases, dict):
super(SingletonMeta, cls).__init__(name, bases, dict)
cls.instance = None
def __call__(cls,*args,**kw):
if cls.instance is None:
cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
return cls.instance
def __deepcopy__(self, item):
return item.__class__.instance
class Globals(object):
__metaclass__ = SingletonMeta
"""
This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children
The root cause is that importing this file from different modules causes this file to be reevalutated each time,
thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug
"""
def __init__(self):
print "%s::__init__()" % (self.__class__.__name__)
self.shared_manager = Manager()
self.shared_process_pool = ProcessPool()
self.shared_thread_pool = ThreadPool()
self.shared_lock = Lock()
然后可以安全地从代码中的其他位置导入。
from globals import Globals
Globals().shared_manager
Globals().shared_process_pool
Globals().shared_thread_pool
Globals().shared_lock
我已经在这里写了一个更加扩展的包装类,围绕着pathos.multiprocessing
:
另外,如果您的用例只需要异步多进程映射作为性能优化,那么joblib将在幕后管理所有进程池,并允许使用非常简单的语法:
squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )
AssertionError: daemonic processes are not allowed to have children
。 - Maxfrom multiprocessing import Pool
,而是使用from concurrent.futures import ProcessPoolExecutor as Pool
进行导入。 - Vishal Gupta