使用多进程模块创建的Python类对象在进程间共享

4

如何创建一个Python共享对象,以便工作进程可以修改它。我通过使用multiprocessing.Process模块创建了一些工作进程。我对multiprocessing.Manager模块有一定的了解。能否用例子解释一下如何将我的类注册到Manager中,启动Manager并创建我的类的共享对象。

1个回答

6

这是一个例子:

from multiprocessing import Process, Pool
from multiprocessing.managers import BaseManager


class MySharedClass(object):
    stored_value = 0
    def get(self):
        return self.stored_value

    def set(self, new_value):
        self.stored_value = new_value
        return self.stored_value


class MyManager(BaseManager):
    pass


MyManager.register('MySharedClass', MySharedClass)

def worker ( proxy_object, i):
    proxy_object.set( proxy_object.get() + i )
    print ("id %d, sum %d" %(i, proxy_object.get()))
    return proxy_object


if __name__ == '__main__':
    manager = MyManager()
    manager.start()
    shared = manager.MySharedClass()

    pool = Pool(5)
    for i in range(33):
        pool.apply(func=worker, args=(shared, i))
    pool.close()
    pool.join()
    print "result: %d" % shared.get()

id 0,总和为0
id 1,总和为1
id 2,总和为3
...
id 31,总和为496
id 32,总和为528
结果为:528

另一种变体(从未在实际项目中使用过):

from multiprocessing import Process, Pool
from multiprocessing.managers import BaseManager, NamespaceProxy


class MySharedClass(object):
    def __init__(self):
        self.stored_value = 0

    def get(self):
        return self.stored_value

    def set(self, new_value):
        self.stored_value = new_value
        return self.stored_value


class MyManager(BaseManager):
    pass

class MyProxy(NamespaceProxy):
    _exposed_ = ('__getattribute__', '__setattr__', '__delattr__')# add 'get' to use get


    #def get(self):
    #    callmethod = object.__getattribute__(self, '_callmethod')
    #    return callmethod('get')

MyManager.register('MySharedClass', MySharedClass, MyProxy)

def worker ( proxy_object, i):
    proxy_object.stored_value =  proxy_object.stored_value + i
    print ("id %d, sum %d" %(i, proxy_object.stored_value))
    return proxy_object


if __name__ == '__main__':
    manager = MyManager()
    manager.start()
    shared = manager.MySharedClass()
    print shared.stored_value

    pool = Pool(5)
    for i in range(33):
        pool.apply(func=worker, args=(shared, i))
    pool.close()
    pool.join()
    print "result: %d" % shared.stored_value

1
如果我在def init(self)方法中加入self.stored_value = 0,然后尝试像proxy_object.stored_value +=1这样访问这个变量。为什么会出现错误:AttributeError: 'AutoProxy [MySharedClass]'对象没有属性'stored_value'。 我的观点是,如果我使用multiprocessing Value()或Manager()类创建相同的变量并共享它,则可以使用对象引用运算符(.)访问它。我想使用对象引用运算符在工作进程中访问MySharedClass()的对象变量。 - Rahul.Shikhare
使用@property和<>.setter修饰符会很好。但我只找到了一种方法,以前从未使用过。它是NamespaceProxy。不知道会出现什么问题。请查看更新的答案。 - minskster
在上述解决方案中,如果我导入 'import eventlet' 并调用 'eventlet.monkey_patch()',它会报错: IOError: [Errno 11] 资源暂时不可用 - Rahul.Shikhare
我从未使用过eventlet,会尝试一下,但是我有一个应用程序,其中包含多进程、gevent(另一个并发网络库)、flask和websocket。让它们一起正常工作是个问题。我花了很多时间找到所有软件包gevent、gevent-socketio、greenlet等的兼容版本。 - minskster

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接