Python多进程PicklingError: 无法pickle <type 'function'>。

377

很抱歉我无法提供一个更简单的例子来重现错误,我的代码也太复杂了不能贴出来。如果我在IPython shell中运行程序而不是普通的Python,事情会顺利解决。

我查阅了一些关于这个问题的先前记录,它们都是由于使用pool调用类函数内定义的函数引起的。但对我来说并非如此。

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我需要任何帮助。

更新:我使用pickle序列化的函数在模块的顶层定义。它调用一个包含嵌套函数的函数,即f()调用g()调用h(),而h()有一个嵌套函数i(),我正在调用pool.apply_async(f)f()g()h()都在顶层定义。我尝试了这种模式的简单示例,并且它可以工作。


5
顶级/被接受的答案很好,但这可能意味着您需要重新构造代码,这可能会很痛苦。我建议任何遇到此问题的人也阅读使用“ dill”和“ pathos”的其他答案。然而,当我处理vtk对象时,我没有用任何解决方案成功 :( 有人成功地在并行处理vtkPolyData中运行Python代码吗? - Chris
10个回答

428
这里是可以被pickled的列表。特别地,只有在模块顶层定义的函数才能被pickled。
这段代码:
import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

产生的错误几乎与您发布的错误相同。
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

问题在于pool方法都使用mp.SimpleQueue将任务传递给工作进程。通过mp.SimpleQueue的所有内容都必须是可pickable的,而foo.work不是pickable的,因为它没有在模块的顶层定义。
可以通过在顶层定义一个调用foo.work()的函数来解决这个问题:
def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

请注意,foo是可选的,因为Foo在顶层定义,foo.__dict__是可选的。

3
谢谢您的回复。我更新了我的问题。虽然我不认为那是原因。 - CuriousMind
16
要触发PicklingError,必须将不可被pickle处理的对象放入队列中。可能是函数或其参数导致的。为了更好地了解问题,建议您复制程序,并逐步精简它,使其变得越来越简单,每次重新运行程序以查看问题是否仍然存在。当它变得非常简单时,您要么已经发现了问题,要么可以在此处发布一些信息。 - unutbu
5
如果您在模块的顶层定义了一个函数,并且对其进行了修饰,那么引用将指向修饰器的输出,无论如何您都会遇到此错误。 - bobpoekert
9
只晚了5年,但我刚刚遇到这个问题。事实证明,“顶层”必须比通常更字面:在我的看法中,函数定义必须在池的初始化之前(即pool = Pool()此处)。我没有预料到这一点,这可能是OP问题持续存在的原因。 - Andras Deak -- Слава Україні
9
特别地,只有在模块的顶层定义的函数才可以进行pickling。似乎对于将functool.partial应用于顶层函数的结果也可以pickle,即使该函数是在另一个函数内定义的。 - user1071847
显示剩余2条评论

139

我建议使用 pathos.multiprocessing,而不是 multiprocessingpathos.multiprocessing 是基于 multiprocessing 的一个分支,它使用 dilldill 可以序列化几乎所有的 Python 对象,因此您可以并行传输更多内容。此外,pathos 分支还能够直接处理多参数函数,这对于类方法非常有用。

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

在这里获取 pathos (如果您愿意,还有 dill): https://github.com/uqfoundation


5
做得很好。对于其他人,我通过以下方式安装了这两个库:sudo pip install git+https://github.com/uqfoundation/dill.git@mastersudo pip install git+https://github.com/uqfoundation/pathos.git@master - Alexander McFarlane
10
@AlexanderMcFarlane 我不建议使用sudo安装Python包(特别是从外部源例如GitHub)。相反,我建议运行:pip install --user git+... - Chris
22
现在可以使用 pip install pathos 命令,并且 pathos 库已经兼容 Python 3。 - Mike McKerns
请注意,在某些情况下,如果 pathos/dill 被迫回到标准的 multiprocess/pickle,则可能出现错误 PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed(请参考 https://github.com/uqfoundation/pathos/issues/67)。 - Francois
5
multiprocessmultiprocessing的一个分支,它在代码中的一些地方用dill替换了pickle......但基本上就是这样。pathosmultiprocess之上提供了一些额外的API层,并且还有额外的后端。但是,这就是大致的意思了。 - Mike McKerns
显示剩余7条评论

63

当使用multiprocessing时出现这个问题时,简单的解决方案是从Pool切换到ThreadPool。除了导入之外,无需更改代码即可完成此操作。

from multiprocessing.pool import ThreadPool as Pool

这种方法有效是因为 ThreadPool 与主线程共享内存,而不是创建一个新的进程- 这意味着不需要进行数据串行化。

这种方法的缺点是 Python 并不是处理多线程最好的语言 - 它使用一种叫做全局解释器锁的机制来保证线程安全,这可能会降低某些用例的速度。然而,如果您主要与其他系统交互(运行 HTTP 命令,与数据库通信,写入文件系统),那么您的代码可能不受 CPU 的限制,并且不会受到太大影响。事实上,当我在编写 HTTP/HTTPS 基准测试时,发现使用此处线程模型的开销和延迟更小,因为创建新进程的开销远高于创建新线程的开销,程序否则只是在等待 HTTP 响应。

因此,如果您在 python 用户空间处理大量任务,则这可能不是最佳方法。


43
但这样你只使用了一个 CPU(至少对于使用 GIL 的常规 Python 版本来说是这样),这有点违背了初衷。 - Endre Both
6
这要看目的是什么。全局解释器锁意味着一次只能运行一个Python代码实例,但对于那些会导致阻塞的操作(如文件系统访问、下载大型或多个文件、运行外部代码),GIL最终并不是一个问题。在某些情况下,开启新进程(而非线程)所带来的开销比GIL开销更重要。 - Robert Hafner
6
没错,谢谢。不过你可能想在回答中加上一个警告。现在随着处理能力主要以更多而不是更强大的CPU核心为形式增加,从多核心转换到单核心执行会带来相当显著的副作用。 - Endre Both
好的,我已经更新了答案并提供了更多细节。但我想指出的是,切换到线程多处理并不意味着 Python 只能在单个核心上运行。 - Robert Hafner
2
...只是想补充一下,在我的情况下(API上的HTTP请求),它非常顺利地运行(而pathos或其他方法则不行)...而且只需要改变一行import,对代码的影响真的很小。非常感谢@RobertHafner。 - undefined
2
非常感谢您的回答!只需将Pool更改为ThreadPool,它对我很有效。 - undefined

42

正如其他人所说,multiprocessing 只能传输可以被pickle的Python对象到工作进程。如果你不能按照unutbu所描述的重新组织你的代码,你可以使用dill的扩展pickling / unpickling功能来传输数据(特别是代码数据),就像我下面展示的一样。

这个解决方案只需要安装dill而不需要其他库,例如pathos:

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()

17
我是“dill”和“pathos”的作者...虽然你说的没错,但使用我的答案中的“pathos”会更加简洁灵活,这不是很好吗?或者我有点偏见... - Mike McKerns
5
我写作时并不知道“pathos”的状态,想提出一种非常接近答案的解决方案。现在我看到了你的解决方案,我同意这是正确的方法。 - rocksportrocker
我看了你的解决方案,就像是“哦...我甚至没有想过这样做。”所以那还挺酷的。 - Mike McKerns
5
感谢您的发帖,我使用了这种方法来处理无法进行pickle(数据序列化)的参数:https://dev59.com/Oobca4cB1Zd3GeqPactD#27892382 - jazzblue
2
@rocksportrocker。我正在阅读这个例子,但不明白为什么要有显式的for循环。通常我会看到并行程序使用列表作为输入,而无需循环即可返回一个列表。 - user1700890
显示剩余7条评论

31

我发现,通过尝试在完美运行的代码上使用分析器,我也能生成完全相同的错误输出。

请注意,这是在Windows上运行的(其中分叉不太优雅)。

我正在运行:

python -m profile -o output.pstats <script> 

发现移除 profiling 后错误消失,添加 profiling 又复现了这个错误。这让我很困惑,因为我知道这段代码曾经是正常工作的。我一直检查 pool.py 是否有更新…… 然后有一种沉重的感觉,于是消除了 profiling,问题解决了。

在此发布以备存档,万一其他人也遇到相同问题。


3
哇,谢谢你提到这个问题!我已经被这个困扰了好几个小时了;我尝试了各种方法,甚至是一些非常简单的例子 - 但貌似都没用。不过,我同时也在我的批处理文件中运行了分析器 :( - tim
2
哦,真的非常感谢你。虽然这听起来很傻,但却是出乎意料的。我认为这应该在文档中提到。我只有一个导入pdb语句和一个简单的顶层函数,只有一个pass,但它却无法进行“pickle”。 - 0xc0de

6
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
如果您在传递给异步作业的模型对象中有任何内置函数,也会出现此错误。
因此,请确保检查传递的模型对象是否具有内置函数。(在我们的情况下,我们使用了django-model-utilsFieldTracker()函数来跟踪某个字段)。这是相关GitHub问题的链接

4
这个解决方案只需要安装 dill 库,不需要其他的库,比如 pathos。
def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

它也适用于numpy数组。


4
一个快速的解决方法是将函数变为全局函数。
from multiprocessing import Pool


class Test:
    def __init__(self, x):
        self.x = x
    
    @staticmethod
    def test(x):
        return x**2


    def test_apply(self, list_):
        global r
        def r(x):
            return Test.test(x + self.x)

        with Pool() as p:
            l = p.map(r, list_)

        return l



if __name__ == '__main__':
    o = Test(2)
    print(o.test_apply(range(10)))

对此有些好奇,这似乎是避免 pickling pyqtSignals 的最简单方法,但内存管理方面怎么样? - misantroop

0
借鉴@rocksportrocker的解决方案, 在发送和接收结果时进行dill处理是有意义的。
import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)

-2

正如@penky Suresh在答案中建议的那样,不要使用内置关键字。

显然,在处理多进程时args是一个内置关键字。


class TTS:
    def __init__(self):
        pass

    def process_and_render_items(self):
        multiprocessing_args = [{"a": "b", "c": "d"}, {"e": "f", "g": "h"}]

        with ProcessPoolExecutor(max_workers=10) as executor:
          # Using args here is fine. 
            future_processes = {
              executor.submit(TTS.process_and_render_item, args)
                for args in multiprocessing_args
            }

            for future in as_completed(future_processes):
                try:
                    data = future.result()
                except Exception as exc:
                    print(f"Generated an exception: {exc}")
                else:
                   print(f"Generated data for comment process: {future}")
 

    # Dont use 'args' here. It seems to be a built-in keyword.
    # Changing 'args' to 'arg' worked for me.
    def process_and_render_item(arg):
        print(arg)
      # This will print {"a": "b", "c": "d"} for the first process
      # and {"e": "f", "g": "h"} for the second process.



注意:制表符/空格可能会有一点偏差。


2
这是一个糟糕的例子。代码不完整。 multiprocessing_args 未定义,TTS 未定义。它与问题无关,该问题与将函数进行 pickling 相关。你还在使用 Python 2.7 回复 9 年前的帖子。如果我可以投反对票,我会这么做。 - TLK3
2
@TLK3,你说得对。我修改了代码并添加了注释。希望现在更容易理解了。我意识到我在回复一个旧帖子,但人们仍然会在旧帖子中寻找更新的答案。 - Gru

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接