Python NotImplementedError: 进程间不能传递池对象

24

当页面添加到页面列表时,我尝试传递工作,但我的代码输出返回了NotImplementedError。以下是我尝试操作的代码:

代码:

from multiprocessing import Pool, current_process
import time
import random
import copy_reg
import types
import threading


class PageControler(object):
    def __init__(self):
        self.nProcess = 3
        self.pages = [1,2,3,4,5,6,7,8,9,10]
        self.manageWork()


    def manageWork(self):

        self.pool = Pool(processes=self.nProcess)

        time.sleep(2)
        work_queue = threading.Thread(target=self.modifyQueue)
        work_queue.start()

        #pool.close()
        #pool.join()

    def deliverWork(self):    
        if self.pages != []:
            pag = self.pages.pop()
            self.pool.apply_async(self.myFun)


    def modifyQueue(self):
        t = time.time()
        while (time.time()-t) < 10:
            time.sleep(1)
            self.pages.append(99)
            print self.pages
            self.deliverWork()

    def myFun(self):
        time.sleep(2)


if __name__ == '__main__':
    def _pickle_method(m):
        if m.im_self is None:
            return getattr, (m.im_class, m.im_func.func_name)
        else:
            return getattr, (m.im_self, m.im_func.func_name)

    copy_reg.pickle(types.MethodType, _pickle_method)

    PageControler()

输出:

NotImplementedError: pool objects cannot be passed between processes or pickled

有没有办法在进程之间传递池对象?
编辑: 我正在使用Python 2.6。

Python 2.7,我猜是吧? - Anthony Kong
Python 2.6,但我已经阅读过Python 2.7也有同样的问题。 - flaviussn
2个回答

43
为了pickle您正在尝试传递给Pool的实例方法,Python需要pickle整个PageControler对象,包括其实例变量。其中一个实例变量是Pool对象本身,而Pool对象无法被pickled,因此出现错误。您可以通过在对象上实现__getstate__,并在pickle之前使用它来从实例中删除pool对象,以解决此问题。
class PageControler(object):
    def __init__(self):
        self.nProcess = 3
        self.pages = [1,2,3,4,5,6,7,8,9,10]
        self.manageWork()


    def manageWork(self):

        self.pool = Pool(processes=self.nProcess)

        time.sleep(2)
        work_queue = threading.Thread(target=self.modifyQueue)
        work_queue.start()

        #pool.close()
        #pool.join()

    def deliverWork(self):    
        if self.pages != []:
            pag = self.pages.pop()
            self.pool.apply_async(self.myFun)


    def modifyQueue(self):
        t = time.time()
        while (time.time()-t) < 10:
            time.sleep(1)
            self.pages.append(99)
            print self.pages
            self.deliverWork()

    def myFun(self):
        time.sleep(2)

    def __getstate__(self):
        self_dict = self.__dict__.copy()
        del self_dict['pool']
        return self_dict

    def __setstate__(self, state):
        self.__dict__.update(state)

__getstate__ 在对一个对象进行pickle之前总是会被调用,它允许您明确地指定哪些对象状态的部分应该被pickle。然后在反序列化时,如果实现了__setstate__(state)方法(我们的情况中有),则会调用该方法,否则会使用由__getstate__返回的dict作为未pickle实例的__dict__。在上面的示例中,我们明确将__dict__设置为我们在__getstate__中返回的dict,但我们也可以不实现__setstate__并获得相同的效果。


1
谢谢你的帮助,再次感谢! - flaviussn
这是一个非常好的答案,帮助我解决了问题。它引导我在Python3中找到了这个链接,可能对其他人有用:https://docs.python.org/3/library/pickle.html#object.__getstate__ - NYCeyes
太棒了!你很厉害,@dano!谢谢。 - Andrés Pérez-Albela H.
1
哇,多好的回答啊。我现在终于明白过去两个小时发生了什么了。谢谢@dano。 - Keith Brodie
这个解决方案解决了我的问题。它看起来就像JAVA中的@transient。 - runzhi xiao

2
Dano的回答是一个好方法,如果你必须将整个对象传递给进程。在你的情况下,你传递给池的函数不需要引用类实例。因此,另一种选择是使用@staticmethod装饰器使函数成为静态方法。如果函数确实需要引用一个或两个类成员变量,这些变量可以作为只读变量传递,并在回调中进行更新,如果需要写入也可以进行更新(当然,在任何情况下,如果你想更新本地类实例,你都需要这样做)。
例如:
Class A(object):

    def __init__(self):
        self._pool = multiprocessing.Pool(1)
        self.member_variable = 1

    @staticmethod
    def MyFunc(variable):
        variable += 1
        return variable

    def Callback(self, return_val):
        self.member_variable = return_val

    def CallFuncAsync(self):
        pool.apply_async(self.MyFunc, (self.member_variable,), callback=self.Callback)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接