多进程:如何在一个类中定义的函数上使用Pool.map?

209

当我运行以下类似命令时:

from multiprocessing import Pool

p = Pool(5)
def f(x):
     return x*x

p.map(f, [1,2,3])

它能够正常工作。然而,将其作为类的一个函数:

class calculate(object):
    def run(self):
        def f(x):
            return x*x

        p = Pool()
        return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

给我以下错误:
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/sw/lib/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我曾看到Alex Martelli发表的一篇帖子,讨论了相同类型的问题,但它并不够明确。

1
"this as a function of a class"? 你能贴出实际获取错误的代码吗?如果没有实际代码,我们只能猜测你做错了什么。 - S.Lott
一般而言,存在比Python标准pickle模块更强大的序列化模块(例如在此答案中提到的picloud模块)。 - klaus se
1
我在 IPython.Parallel 中也遇到了闭包的类似问题,但那里可以通过将对象推送到节点来解决该问题。在多进程中解决这个问题似乎相当麻烦。 - Alex S
这里的 calculate 是可被 pickle 序列化的,因此似乎可以通过以下步骤解决:1)创建一个函数对象,其构造函数复制了一个 calculate 实例;2)将该函数对象的实例传递给 Poolmap 方法。是这样吗? - rd11
2
@math 我不相信 Python 的“最近的更改”会有任何帮助。multiprocessing 模块的一些限制是由于其旨在成为跨平台实现,以及 Windows 中缺少类似 fork(2) 的系统调用所致。如果您不关心 Win32 支持,则可能有一个更简单的基于进程的解决方法。或者,如果您准备使用线程而不是进程,则可以将 from multiprocessing import Pool 替换为 from multiprocessing.pool import ThreadPool as Pool - Aya
20个回答

94

迄今为止发布的代码我无法使用,因为使用"multiprocessing.Pool"的代码不支持lambda表达式,而没有使用"multiprocessing.Pool"的代码会生成与工作项数量相同的进程。

我修改了代码,使其生成预定义数量的工作者,并仅在存在空闲工作者时才遍历输入列表。 我还启用了工作者的"守护进程"模式,以便ctrl-c可以按预期工作。

import multiprocessing


def fun(f, q_in, q_out):
    while True:
        i, x = q_in.get()
        if i is None:
            break
        q_out.put((i, f(x)))


def parmap(f, X, nprocs=multiprocessing.cpu_count()):
    q_in = multiprocessing.Queue(1)
    q_out = multiprocessing.Queue()

    proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
            for _ in range(nprocs)]
    for p in proc:
        p.daemon = True
        p.start()

    sent = [q_in.put((i, x)) for i, x in enumerate(X)]
    [q_in.put((None, None)) for _ in range(nprocs)]
    res = [q_out.get() for _ in range(len(sent))]

    [p.join() for p in proc]

    return [x for i, x in sorted(res)]


if __name__ == '__main__':
    print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))

5
你如何使进度条与这个 parmap 函数正常工作? - shockburner
2
一个问题--我使用了这个解决方案,但是注意到我生成的Python进程仍然在内存中保持活动状态。你有什么快速想法可以在parmap退出时杀死它们吗? - CompEcon
2
@greole将(None, None)作为最后一项传递,表示对于每个进程,fun已经到达了项目序列的末尾。 - aganders3
4
如果你有足够的声望值,你可以通过悬赏来请求别人帮你完成任务。 - Mark
3
_pickle.PicklingError: 无法序列化 <function <lambda> at 0x00000254E1FDE6A8>:在 main 上查找属性 <lambda> 失败 - Wood
显示剩余7条评论

78

如果不跳出标准库,多进程和pickling会有限制和问题。

如果您使用名为pathos.multiprocessingmultiprocessing分支,则可以直接在multiprocessingmap函数中使用类和类方法。这是因为使用了dill而不是picklecPickle,而dill可以序列化Python中的几乎任何内容。

pathos.multiprocessing还提供了异步map函数……并且它可以映射带有多个参数的函数(例如:map(math.pow, [1,2,3], [4,5,6])

请参见讨论: What can multiprocessing and dill do together?

以及: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

它甚至可以处理您最初编写的代码,而无需修改,并且可以从解释器中运行。 为什么要使用更脆弱和特定于单个情况的其他方法呢?

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
...  def run(self):
...   def f(x):
...    return x*x
...   p = Pool()
...   return p.map(f, [1,2,3])
... 
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]

在这里获取代码:https://github.com/uqfoundation/pathos

另外,为了更好地展示它的功能:

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]

3
pathos.multiprocessing 还有一个异步映射 (amap),可以使用进度条和其他异步编程。 - Mike McKerns
1
我喜欢pathos.multiprocessing,它可以几乎完全替代非并行映射,同时享受多进程的好处。我有一个简单的pathos.multiprocessing.map包装器,使得在跨多个核心处理只读大型数据结构时更加内存高效,请参见此git存储库 - Fashandge
1
是的。我已经有一段时间没有发布了,因为我一直在将功能拆分成单独的包,并转换为2/3兼容的代码。以上内容中的许多都已经模块化到multiprocess中,它是2/3兼容的。请参见http://stackoverflow.com/questions/27873093/installing-python-package-pathos-from-git和https://pypi.python.org/pypi/multiprocess。 - Mike McKerns
1
顺便提一下,如果你是维护者,最好加上免责声明。 - Alex Huszagh
6
作为后续说明,pathos已经有一个新的稳定版本,并且也兼容2.x和3.x。 - Mike McKerns
显示剩余9条评论

73

我也对pool.map能够接受哪种类型的函数感到不满。为了规避这个限制,我编写了以下代码。它似乎可以工作,甚至可以递归使用parmap。

from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(pipe, x):
        pipe.send(f(x))
        pipe.close()
    return fun

def parmap(f, X):
    pipe = [Pipe() for x in X]
    proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
    [p.start() for p in proc]
    [p.join() for p in proc]
    return [p.recv() for (p, c) in pipe]

if __name__ == '__main__':
    print parmap(lambda x: x**x, range(1, 5))

1
这对我非常有效,谢谢。我发现一个弱点:我尝试在一些传递defaultdict的函数上使用parmap,并再次遇到了PicklingError。我没有找到解决方法,只是重新编写了我的代码,不再使用defaultdict。 - sans
2
这在Python 2.7.2上无法正常工作(默认于2011年6月12日15:08:59使用[MSC v.1500 32位(Intel)]在win32上) - ubershmekel
3
这可以在Python 2.7.3 Aug 1, 2012, 05:14:39上运行。这不能用于巨大的可迭代对象 -> 它会导致OSError:[Errno 24] Too many open files,因为它打开了太多的管道。 - Eiyrioü von Kauyf
这个解决方案为每个工作项生成一个进程。下面"klaus se"的解决方案更加高效。 - ypnos
我有一个类似的问题 - Asmita Poddar
7
这个解决方案中是否有分类,还是说没有?它是否回答了原来的问题? - Dr_Zaszuś

42

据我所知,目前没有解决您问题的方案:您提供给map()的函数必须通过导入您的模块才能访问。这就是为什么Robert的代码可以正常工作:可以通过导入以下代码获取函数f()

def f(x):
    return x*x

class Calculate(object):
    def run(self):
        p = Pool()
        return p.map(f, [1,2,3])

if __name__ == '__main__':
    cl = Calculate()
    print cl.run()

我实际上添加了一个"main"部分,因为这遵循了Windows平台的建议(“确保主模块可以被新的Python解释器安全地导入,而不会引起意外的副作用”)。
我还在Calculate前面加了一个大写字母,以遵循PEP 8。 :)

18
mrule的解决方案是正确的,但存在一个Bug:如果子进程返回大量数据,则可以填满管道缓冲区,在子进程的 pipe.send() 上阻塞,而父进程正在等待子进程在 pipe.join() 上退出。解决方案是在join()之前读取子进程的数据。此外,为了避免死锁,子进程应关闭父进程的管道端口。下面的代码修复了这个问题。另外要注意,这个 parmap 为每个X中的元素创建一个进程。更高级的解决方案是使用multiprocessing.cpu_count()X划分为若干块,然后合并结果再返回。我把这留给读者作为练习,以不破坏mrule提供的简洁解答的优雅性。 ;)
from multiprocessing import Process, Pipe
from itertools import izip

def spawn(f):
    def fun(ppipe, cpipe,x):
        ppipe.close()
        cpipe.send(f(x))
        cpipe.close()
    return fun

def parmap(f,X):
    pipe=[Pipe() for x in X]
    proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
    [p.start() for p in proc]
    ret = [p.recv() for (p,c) in pipe]
    [p.join() for p in proc]
    return ret

if __name__ == '__main__':
    print parmap(lambda x:x**x,range(1,5))

你如何选择进程数量? - patapouf_ai
然而,由于错误 OSError: [Errno 24] Too many open files,它很快就会死亡。我认为需要对进程数量进行某种限制才能使其正常工作... - patapouf_ai

15

我也曾经遇到过这个问题。举个简单的例子,我把函数作为类的数据成员:

from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # Needed to do something like this (the following line won't work)
        return pool.map(self.f,list1,list2)  

我需要在Pool.map()调用中使用self.f函数,但self.f不接受元组作为参数。由于该函数嵌入在类中,所以我不清楚如何编写其他答案建议的包装器类型。

我通过使用一个不同的包装器解决了这个问题,该包装器接受元组/列表,其中第一个元素是函数,其余元素是该函数的参数,称为eval_func_tuple(f_args)。使用此方法,有问题的行可以替换为return pool.map(eval_func_tuple, itertools.izip(itertools.repeat(self.f), list1, list2))。以下是完整的代码:

文件: util.py

def add(a, b): return a+b

def eval_func_tuple(f_args):
    """Takes a tuple of a function and args, evaluates and returns result"""
    return f_args[0](*f_args[1:])  

文件:main.py

from multiprocessing import Pool
import itertools
import util  

pool = Pool()
class Example(object):
    def __init__(self, my_add): 
        self.f = my_add  
    def add_lists(self, list1, list2):
        # The following line will now work
        return pool.map(util.eval_func_tuple, 
            itertools.izip(itertools.repeat(self.f), list1, list2)) 

if __name__ == '__main__':
    myExample = Example(util.add)
    list1 = [1, 2, 3]
    list2 = [10, 20, 30]
    print myExample.add_lists(list1, list2)  

运行main.py将得到[11, 22, 33]。可以随意改进,例如eval_func_tuple也可以修改为接受关键字参数。

另外,在其他答案中,“parmap”函数可以在进程数量超过可用CPU数量的情况下更加高效。我在下面复制了一个编辑过的版本。这是我的第一篇帖子,我不确定是否应该直接编辑原始答案。我还重命名了一些变量。

from multiprocessing import Process, Pipe  
from itertools import izip  

def spawn(f):  
    def fun(pipe,x):  
        pipe.send(f(x))  
        pipe.close()  
    return fun  

def parmap(f,X):  
    pipe=[Pipe() for x in X]  
    processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]  
    numProcesses = len(processes)  
    processNum = 0  
    outputList = []  
    while processNum < numProcesses:  
        endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)  
        for proc in processes[processNum:endProcessNum]:  
            proc.start()  
        for proc in processes[processNum:endProcessNum]:  
            proc.join()  
        for proc,c in pipe[processNum:endProcessNum]:  
            outputList.append(proc.recv())  
        processNum = endProcessNum  
    return outputList    

if __name__ == '__main__':  
    print parmap(lambda x:x**x,range(1,5))         

14

我知道这个问题是8年10个月前提出的,但我想呈现我的解决方案:

from multiprocessing import Pool

class Test:

    def __init__(self):
        self.main()

    @staticmethod
    def methodForMultiprocessing(x):
        print(x*x)

    def main(self):
        if __name__ == "__main__":
            p = Pool()
            p.map(Test.methodForMultiprocessing, list(range(1, 11)))
            p.close()

TestObject = Test()

您只需将您的类函数转换为静态方法即可。但也可以使用类方法:

from multiprocessing import Pool

class Test:

    def __init__(self):
        self.main()

    @classmethod
    def methodForMultiprocessing(cls, x):
        print(x*x)

    def main(self):
        if __name__ == "__main__":
            p = Pool()
            p.map(Test.methodForMultiprocessing, list(range(1, 11)))
            p.close()

TestObject = Test()

在Python 3.7.3中进行测试


11

我知道这个问题已经被提出了6年之久,但我想分享我的解决方案。因为上面的一些建议看起来非常复杂,而我的解决方案实际上非常简单。

我所要做的就是将pool.map()调用包装到一个助手函数中。将类对象与方法参数作为元组一起传递,大致如下。

def run_in_parallel(args):
    return args[0].method(args[1])

myclass = MyClass()
method_args = [1,2,3,4,5,6]
args_map = [ (myclass, arg) for arg in method_args ]
pool = Pool()
pool.map(run_in_parallel, args_map)

9

我采用了klaus se和aganders3的答案,并创建了一个文档化的模块,更易读并且只需一个文件。您可以将其添加到您的项目中。它甚至有一个可选的进度条!

"""
The ``processes`` module provides some convenience functions
for using parallel processes in python.

Adapted from https://dev59.com/SHA75IYBdhLWcg3wdIv7#16071616

Example usage:

    print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)

Comments:

"It spawns a predefined amount of workers and only iterates through the input list
 if there exists an idle worker. I also enabled the "daemon" mode for the workers so
 that KeyboardInterupt works as expected."

Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.

Alternatively, use this fork of multiprocessing: 
https://github.com/uqfoundation/multiprocess
"""

# Modules #
import multiprocessing
from tqdm import tqdm

################################################################################
def apply_function(func_to_apply, queue_in, queue_out):
    while not queue_in.empty():
        num, obj = queue_in.get()
        queue_out.put((num, func_to_apply(obj)))

################################################################################
def prll_map(func_to_apply, items, cpus=None, verbose=False):
    # Number of processes to use #
    if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
    # Create queues #
    q_in  = multiprocessing.Queue()
    q_out = multiprocessing.Queue()
    # Process list #
    new_proc  = lambda t,a: multiprocessing.Process(target=t, args=a)
    processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
    # Put all the items (objects) in the queue #
    sent = [q_in.put((i, x)) for i, x in enumerate(items)]
    # Start them all #
    for proc in processes:
        proc.daemon = True
        proc.start()
    # Display progress bar or not #
    if verbose:
        results = [q_out.get() for x in tqdm(range(len(sent)))]
    else:
        results = [q_out.get() for x in range(len(sent))]
    # Wait for them to finish #
    for proc in processes: proc.join()
    # Return results #
    return [x for i, x in sorted(results)]

################################################################################
def test():
    def slow_square(x):
        import time
        time.sleep(2)
        return x**2
    objs    = range(20)
    squares = prll_map(slow_square, objs, 4, verbose=True)
    print "Result: %s" % squares

编辑: 添加了@alexander-mcfarlane建议和一个测试函数


你的进度条有一个问题... 进度条只能测量工作负载在处理器之间分配的低效性。如果工作负载被完美地分割,那么所有处理器将同时 join(),并且您将在 tqdm 显示中看到 100% 完成的一闪而过。它只有在每个处理器具有偏差的工作负载时才有用。 - Alexander McFarlane
1
tqdm()移到包装行中:result = [q_out.get() for _ in tqdm(sent)],这样会更好 - 非常感谢你的努力,所以给一个+1。 - Alexander McFarlane
谢谢你的建议,我会尝试并更新答案! - xApple
答案已更新,进度条的工作效果也更好了! - xApple
我不知道为什么,但在尝试这段代码片段时出现了错误! _pickle.PicklingError: 无法将<function <lambda> at 0x000001717B311E18>进行Pickling:__main__上的<lambda>属性查找失败 - Marine Galantin

7

在类中定义的函数(甚至是嵌套在类函数内部的函数)实际上无法进行pickle操作。不过,可以采取以下措施:

def f(x):
    return x*x

class calculate(object):
    def run(self):
        p = Pool()
    return p.map(f, [1,2,3])

cl = calculate()
print cl.run()

18
谢谢,但我认为在类外定义函数有些不太规矩。类应该包含完成特定任务所需的所有内容。 - Mermoz
3
“这个类应该捆绑所有需要的东西。” 真的吗?我找不到很多这样的例子。大多数类都依赖于其他类或函数。为什么把一个类的依赖称为“脏”呢?依赖有什么问题吗? 注:本翻译尽可能地保留了原文的语气和表达方式,同时使其更加通俗易懂。 - S.Lott
好的,该函数不应修改现有的类数据--因为它会修改其他进程中的版本--所以它可以是一个静态方法。你可以对静态方法进行一定程度的封装:https://dev59.com/SkrSa4cB1Zd3GeqPZtvv#1914798或者,对于这种微不足道的事情,你可以使用lambda表达式。 - robert
@S.Lott 这里有一个例子。我有一个 FileParser 类,其中有一个方法 parse_one_file(self, filepath: str),我想将其应用于整个文件夹,使用 parse_all_files_in(self, dirpath: str): for filepath in os.listdir(dirpath): self.parse_one_file(filepath),也许这个文件夹有1000个文件,而解析一个文件需要时间,所以我用 pool.map(self.parse_one_file, os.listdir(dirpath)) 替换了我的 for 循环。这是一个简单的例子,一个干净的一行代码,但为了使它工作,我不得不在类外部放置一个等效函数,而不是一个整洁的方法?感觉很不对。 - Guimoute

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接