如何使用multiprocessing.Manager()?

84

我对Python中的multiprocessing.Manager()有些担忧。以下是示例:

import multiprocessing

def f(ns):
    ns.x *=10
    ns.y *= 10

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    ns = manager.Namespace()
    ns.x = 1
    ns.y = 2

    print 'before', ns
    p = multiprocessing.Process(target=f, args=(ns,))
    p.start()
    p.join()
    print 'after', ns

输出结果为:

before Namespace(x=1, y=2)
after Namespace(x=10, y=20)

到目前为止,它按照我的预期工作,然后我修改了代码如下:

import multiprocessing

def f(ns):
    ns.x.append(10)
    ns.y.append(10)

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    ns = manager.Namespace()
    ns.x = []
    ns.y = []

    print 'before', ns
    p = multiprocessing.Process(target=f, args=(ns,))
    p.start()
    p.join()
    print 'after', ns

现在的输出是:

before Namespace(x=[], y=[])
after Namespace(x=[], y=[])

我感到困惑的是,列表没有按照我预期的那样改变。有人可以帮助我弄清楚发生了什么吗?

2个回答

96
经理代理对象无法传播对容器内(非托管的)可变对象所做的更改。换句话说,如果您有一个manager.list()对象,则对托管列表本身所做的任何更改都会传播到所有其他进程。但是,如果您在该列表中拥有一个普通的Python列表内部,则对内部列表所做的任何更改都不会传播,因为管理器无法检测到更改。
为了传播更改,您必须对嵌套列表也使用manager.list()对象(需要Python 3.6或更高版本),或者直接修改manager.list()对象(请参见注释关于Python 3.5或更早版本中的manager.list)。
例如,请考虑以下代码及其输出:
import multiprocessing
import time

def f(ns, ls, di):
    ns.x += 1
    ns.y[0] += 1
    ns_z = ns.z
    ns_z[0] += 1
    ns.z = ns_z

    ls[0] += 1
    ls[1][0] += 1 # unmanaged, not assigned back
    ls_2 = ls[2]  # unmanaged...
    ls_2[0] += 1
    ls[2] = ls_2  # ... but assigned back
    ls[3][0] += 1 # managed, direct manipulation

    di[0] += 1
    di[1][0] += 1 # unmanaged, not assigned back
    di_2 = di[2]  # unmanaged...
    di_2[0] += 1
    di[2] = di_2  # ... but assigned back
    di[3][0] += 1 # managed, direct manipulation

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    ns = manager.Namespace()
    ns.x = 1
    ns.y = [1]
    ns.z = [1]
    ls = manager.list([1, [1], [1], manager.list([1])])
    di = manager.dict({0: 1, 1: [1], 2: [1], 3: manager.list([1])})

    print('before', ns, ls, ls[2], di, di[2], sep='\n')
    p = multiprocessing.Process(target=f, args=(ns, ls, di))
    p.start()
    p.join()
    print('after', ns, ls, ls[2], di, di[2], sep='\n')

输出:

before
Namespace(x=1, y=[1], z=[1])
[1, [1], [1], <ListProxy object, typeid 'list' at 0x10b8c4630>]
[1]
{0: 1, 1: [1], 2: [1], 3: <ListProxy object, typeid 'list' at 0x10b8c4978>}
[1]
after
Namespace(x=2, y=[1], z=[2])
[2, [1], [2], <ListProxy object, typeid 'list' at 0x10b8c4630>]
[2]
{0: 2, 1: [1], 2: [2], 3: <ListProxy object, typeid 'list' at 0x10b8c4978>}
[2]

当您直接将新值分配给托管容器时,它会发生变化;当它被分配给托管容器中的可变容器时,它不会改变;但是如果可变容器然后被重新分配给托管容器,则它会再次改变。使用嵌套的托管容器也可以工作,直接检测更改而无需分配回父容器。

20
从3.6版本开始,嵌套对象的更改会自动传播。 - max
2
我在Python 3.6.4中使用NameSpace和Manager时遇到了一些嵌套字典的问题。在继续之前,请确保您的嵌套对象被正确更新。对我来说,解决方案是明确定义每个要共享的对象为Manager对象。 - Joules
3
只要这些嵌套对象也是代理对象,那么这个答案中的代码就会嵌套常规的非代理列表。在 lsdi 中的嵌套列表需要包装在 manager.list() 函数调用中。 - Martijn Pieters
@MartijnPieters,谢谢您的澄清!我看到我完全没有理解那个评论。 - senderle

27

ns 是一个 NamespaceProxy 实例。这些对象具有特殊的 __getattr____setattr____delattr__ 方法,允许在进程之间共享值。 为了利用这种机制来更改值,必须触发 __setattr__

ns.x.append(10)

导致调用ns.__getattr__来检索ns.x,但不会导致调用ns.__setattr__

要解决这个问题,您必须使用ns.x = ...

def f(ns):   
    tmp = ns.x     # retrieve the shared value
    tmp.append(10)
    ns.x = tmp     # set the shared value

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接