如何在Python中迭代字典代理?

18
我正在使用Python的multiprocessing.Manager来共享一个数据集,其中一个进程将生成它,其他进程将查看它。然而,我遇到了这样一种问题,即由manager.dict()返回的字典代理不支持iteritems()
我可以遍历items(),但这意味着构造字典中所有项的新元组,这是一个很大的数字。是否有一种方法可以在不构造中间列表/元组的情况下完成它,从而仅使用恒定量的额外内存?
注意:如果解决方案需要生成进程暂停迭代,那也没有问题。

你考虑过使用SyncManager并在其中注册你自己的代理,使得iteritems暴露出来吗? - oleg
1
@oleg,你不能简单地暴露iteritems,因为它返回的字典迭代器是不可pickle的。这就是为什么默认的dict代理不会暴露它,从而引出了这个问题。 - otus
我并没有说“只是”暴露。:) 我们可以使用IteratorProxy来暴露iteritems吗? - oleg
@oleg 对不起,如果我的回答显得不屑一顾。我相信某种代理是一个解决方案,但我不知道如何构建它。 - otus
3个回答

4
您可以使用keys()迭代来减少内存占用。但是,您需要注意保护好被删除的键。
否则,以下是两种不同方法的示例,可让您遍历字典中的项。在此示例中,iteritems()方法仅适用于创建管理器对象和管理器对象创建的子进程。这是因为管理器对象需要创建新的代理,而其他进程无法访问它。iteritems2()方法可以从其他进程中使用,因为它不依赖于在这些进程中创建新的代理。
import multiprocessing as mp
import multiprocessing.managers

class mydict(dict):
    def __init__(self, *args, **kwargs):
        dict.__init__(self, *args, **kwargs)
        self.iters = {}

    def iteritems(self):
        print "iteritems", mp.current_process()
        return dict.iteritems(self)

    def _iteritems_start(self):
        print "_iteritems_start", mp.current_process()
        i = dict.iteritems(self)
        self.iters[id(i)] = i
        return id(i)

    def _iteritems_next(self, iter_id):
        try:
            return self.iters[iter_id].next()
        except StopIteration:
            del self.iters[iter_id]
            return None

class mydict_proxy(mp.managers.DictProxy):
    def iteritems(self):
        print "iteritems proxy", mp.current_process()
        return self._callmethod("iteritems")

    def iteritems2(self):
        print "iteritems2 proxy", mp.current_process()
        iter_id = self._callmethod("_iteritems_start")
        def generator():
            while True:
                a = self._callmethod("_iteritems_next", 
                             (iter_id,))
                if a == None:
                    return
                yield a
        return generator()

    _method_to_typeid_ = { "iteritems": "Iterator" }
    _exposed_ = mp.managers.DictProxy._exposed_
    _exposed_ += ("iteritems", "_iteritems_start", "_iteritems_next")

class mymanager(mp.managers.BaseManager):
    pass
mymanager.register("mydict", mydict, mydict_proxy)
mymanager.register("Iterator", proxytype = mp.managers.IteratorProxy,
           create_method = False)

def other(d):
    for k, v in d.iteritems2():
        d[k] = v.lower()
    for k, v in d.iteritems():
        d[k] = ord(v)

def main():
    manager = mymanager()
    manager.start()
    d = manager.mydict(list(enumerate("ABCDEFGHIJKLMNOP")))
    for (k, v) in d.iteritems():
        print k, v
    proc = mp.Process(target = other, args = (d,))
    proc.start()
    proc.join()
    for (k, v) in d.iteritems():
        print k, v

if __name__ == "__main__":
    main()

请注意,尽管这段代码可能更加省内存,但它的速度可能会慢得多。

0

你可以使用SyncManager类来注册自己的类型。然后你可以在该类型上实现方法,例如从字典中仅获取有限数量的项。

下面是一个示例,帮助你入门:

import multiprocessing
from multiprocessing import managers


class TakerDict(dict):
    """Like a dict, but allows taking a limited number of items."""

    def take(self, items=1):
        """Take the first `items` items."""
        return [item for _, item in zip(range(items), self.items())]


# NOTE: add other dict methods to the tuple if you need them.
TakerProxy = managers.MakeProxyType('TakerProxy', ('take',))

managers.SyncManager.register('taker', TakerDict, TakerProxy)


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    taker = manager.taker()
    # in other processes, use e.g. taker.take(5)

因此,为了限制内存使用,您需要反复调用管理进程以获取下一批元素。

但是,要做到这一点,您的字典必须支持索引(以便您可以从特定偏移处恢复)。由于您无法访问字典中元素的基础顺序,因此最好改用列表(例如 manager.list())。然后在子进程中,请求列表的len(),并通过切片进行索引,以获得适当大小的批次 - 您不需要为此注册任何代理类型。


3
你的做法基本上就是实现我在问题中提到的“转换为列表”的解决方法,只不过方式有些复杂。这并没有真正解决需要列表导致的内存使用问题。 - otus
好的,这最终确实将数据转换为列表,因此它会带来内存开销。只是以块的方式执行,因此开销不会太大。我认为它的性能不会比“IteratorProxy”方法差,但我没有测量过任何东西。 - Attila O.
除了它实际上不执行块:"要做到这一点,您的字典必须支持索引(以便您可以从特定偏移量恢复)"。 - otus
你说得对。我现在明白这并没有真正解决你的问题(除非你将数据类型从字典改为列表,但这可能不是理想的选择)。- 我应该删除这个答案还是它仍然有用? - Attila O.
1
我认为这可能会帮助到需要解决问题的人找到真正的解决方案。 - otus

-2

iteritems() 适用于 列表 字典。您可以使用 for 循环。或者您可以使用 sorted(),它将返回一个排序后的键列表,然后迭代该列表并执行 dict[key]。希望这有所帮助。如果有更好的方法,请与我分享。我渴望知道。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接