使用defaultdict和多进程?

13

我只是在实验和学习,我知道如何创建可以被多个进程访问的共享字典,但我不确定如何保持字典同步。我相信defaultdict说明了我遇到的问题。

from collections import defaultdict
from multiprocessing import Pool, Manager, Process

#test without multiprocessing
s = 'mississippi'
d = defaultdict(int)
for k in s:
    d[k] += 1

print d.items() # Success! result: [('i', 4), ('p', 2), ('s', 4), ('m', 1)]
print '*'*10, ' with multiprocessing ', '*'*10

def test(k, multi_dict):
    multi_dict[k] += 1

if __name__ == '__main__':
    pool = Pool(processes=4)
    mgr = Manager()
    multi_d = mgr.dict()
    for k in s:
        pool.apply_async(test, (k, multi_d))

    # Mark pool as closed -- no more tasks can be added.
    pool.close()

    # Wait for tasks to exit
    pool.join()

    # Output results
    print multi_d.items()  #FAIL

print '*'*10, ' with multiprocessing and process module like on python site example', '*'*10
def test2(k, multi_dict2):
    multi_dict2[k] += 1


if __name__ == '__main__':
    manager = Manager()

    multi_d2 = manager.dict()
    for k in s:
        p = Process(target=test2, args=(k, multi_d2))
    p.start()
    p.join()

    print multi_d2 #FAIL
第一个结果有效(因为它没有使用multiprocessing),但我在尝试使用multiprocessing时遇到了问题。我不确定如何解决这个问题,但我认为可能是由于未同步(稍后再合并结果)或者在multiprocessing内部我无法设置defaultdict(int)到字典中的原因。希望能得到任何有关如何使其工作的帮助或建议!
2个回答

20

您可以继承BaseManager并注册额外的类型以进行共享。在默认生成的AutoProxy类型不可用的情况下,您需要提供适当的代理类型。对于defaultdict,如果您只需要访问已经存在于dict中的属性,则可以使用DictProxy

from multiprocessing import Pool
from multiprocessing.managers import BaseManager, DictProxy
from collections import defaultdict

class MyManager(BaseManager):
    pass

MyManager.register('defaultdict', defaultdict, DictProxy)

def test(k, multi_dict):
    multi_dict[k] += 1

if __name__ == '__main__':
    pool = Pool(processes=4)
    mgr = MyManager()
    mgr.start()
    multi_d = mgr.defaultdict(int)
    for k in 'mississippi':
        pool.apply_async(test, (k, multi_d))
    pool.close()
    pool.join()
    print multi_d.items()

1
哇,它起作用了,谢谢。我不太理解你的修改,MyManager(BaseManager类)的目的是什么? - Lostsoul
@Lostsoul 这是文档化的方法,用于添加支持共享Manager不支持的其他类型。 - Janne Karila
@JanneKarila,你知道在哪里可以找到所有代理类型的列表吗? - Grr
@Grr 请查看managers.py源代码 - Janne Karila
很惊讶这些集合没有被原始管理器类支持,但非常感谢您让我们知道它是可能的! - galactica
显示剩余5条评论

4

嗯,看起来Manager类似乎只提供了一些预定义的数据结构,这些数据结构可以在进程之间共享,而defaultdict不在其中。如果你确实只需要那个defaultdict,最简单的解决方法是自己实现默认行为:

def test(k, multi_dict):
    if k not in multi_dict:
        multi_dict[k] = 0
    multi_dict[k] += 1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接