在Python中深度合并字典的字典

215
我需要合并多个字典,这是我目前的例子:
dict1 = {1:{"a":{"A"}}, 2:{"b":{"B"}}}

dict2 = {2:{"c":{"C"}}, 3:{"d":{"D"}}}

A B CD 是树的叶子节点,就像 {"info1":"value", "info2":"value2"} 这样。

有一个未知层级(深度)的字典,可能是 {2:{"c":{"z":{"y":{C}}}}}

在我的情况下,它代表一个目录/文件结构,其中节点是文档,叶子是文件。

我想要合并它们以获得:

 dict3 = {1:{"a":{"A"}}, 2:{"b":{"B"},"c":{"C"}}, 3:{"d":{"D"}}}

我不确定如何用Python轻松地做到这一点。

请查看我的NestedDict类:http://stackoverflow.com/a/16296144/2334951 它可以管理嵌套字典结构,如合并等操作。 - SzieberthAdam
3
提醒所有寻找解决方案的人:本问题仅涉及嵌套字典。大多数答案无法正确处理结构中包含字典列表的更复杂情况。如果您需要此功能,请尝试@Osiloke的答案:https://dev59.com/IWw05IYBdhLWcg3wfx80#25270947 - SHernandez
参见:python dpath merge - dreftymac
@andrew cooke的解决方案的一个陷阱是,即使存在冲突错误,更改也会影响第一个字典。为了避免这个陷阱,可以使用@andrew cooke的源代码创建一个递归辅助函数,并添加一个参数,该参数具有第一个字典的克隆。该参数将被更改并返回,而不是第一个字典。请参见:https://dev59.com/IWw05IYBdhLWcg3wfx80#71700270 - diogo
可以使用Addict来合并字典:d = Dict({1:{"a":{'A'}}, 2:{"b":{'B'}}}); d.update({2:{"c":{'C'}}, 3:{"d":{'D'}}}); d => {1: {'a': {'A'}}, 2: {'b': {'B'}, 'c': {'C'}}, 3: {'d': {'D'}}} - bartolo-otrit
显示剩余2条评论
36个回答

5

概述

下面的方法将字典的深度合并问题划分为以下几个部分:

  1. 一个参数化的浅合并函数 merge(f)(a,b),使用函数 f 合并两个字典 ab

  2. 一个递归合并函数 f,与 merge 一起使用


实现

合并两个(非嵌套)字典的函数可以用很多种方式编写。我个人比较喜欢以下方式:

def merge(f):
    def merge(a,b): 
        keys = a.keys() | b.keys()
        return {key:f(a.get(key), b.get(key)) for key in keys}
    return merge

定义一个适当的递归合并函数f的好方法是使用multipledispatch,它允许定义函数根据其参数类型沿不同路径进行评估。

from multipledispatch import dispatch

#for anything that is not a dict return
@dispatch(object, object)
def f(a, b):
    return b if b is not None else a

#for dicts recurse 
@dispatch(dict, dict)
def f(a,b):
    return merge(f)(a,b)

例子

要合并两个嵌套的字典,只需使用 merge(f) ,例如:

dict1 = {1:{"a":"A"},2:{"b":"B"}}
dict2 = {2:{"c":"C"},3:{"d":"D"}}
merge(f)(dict1, dict2)
#returns {1: {'a': 'A'}, 2: {'b': 'B', 'c': 'C'}, 3: {'d': 'D'}} 

注意:

这种方法的优点是:

  • 函数是由较小的函数构建而成的,每个函数只做一件事情,使得代码更简单易于理解和测试。

  • 行为不是硬编码的,而是可以根据需要进行更改和扩展,从而提高代码的重用性(请参见下面的示例)。


自定义

有些答案还考虑了包含其他(可能嵌套)字典的列表的字典。在这种情况下,可能希望映射列表并根据位置合并它们。这可以通过将另一个定义添加到合并函数 f 中来完成:

import itertools
@dispatch(list, list)
def f(a,b):
    return [merge(f)(*arg) for arg in itertools.zip_longest(a, b)]

5

简洁明了:

from collections.abc import MutableMapping as Map

def nested_update(d, v):
    """
    Nested update of dict-like 'd' with dict-like 'v'.
    """

    for key in v:
        if key in d and isinstance(d[key], Map) and isinstance(v[key], Map):
            nested_update(d[key], v[key])
        else:
            d[key] = v[key]

这个函数的工作方式类似于(并且是基于)Python的dict.update方法。它返回None(如果您喜欢,您可以随时添加return d),因为它会直接在字典d中进行更新。在v中的键将覆盖d中的任何现有键(它不尝试解释字典的内容)。

它也适用于其他(“类似字典”的)映射。

示例:

people = {'pete': {'gender': 'male'}, 'mary': {'age': 34}}
nested_update(people, {'pete': {'age': 41}})

# Pete's age was merged in
print(people)
{'pete': {'gender': 'male', 'age': 41}, 'mary': {'age': 34}}

当Python的常规dict.update方法产生以下结果时:

people = {'pete': {'gender': 'male'}, 'mary': {'age': 34}}
people.update({'pete': {'age': 41}})

# We lost Pete's gender here!
print(people) 
{'pete': {'age': 41}, 'mary': {'age': 34}}

3

关于andrew cookes的回答,有一个小问题:当你修改返回的字典时,它有时会修改第二个参数b。具体来说,是因为这一行代码:

if key in a:
    ...
else:
    a[key] = b[key]

如果b[key]是一个dict,它将简单地被赋值给a,这意味着对该dict的任何后续修改都会影响到ab
a={}
b={'1':{'2':'b'}}
c={'1':{'3':'c'}}
merge(merge(a,b), c) # {'1': {'3': 'c', '2': 'b'}}
a # {'1': {'3': 'c', '2': 'b'}} (as expected)
b # {'1': {'3': 'c', '2': 'b'}} <----
c # {'1': {'3': 'c'}} (unmodified)

为了解决这个问题,需要将该行替换为以下内容:
if isinstance(b[key], dict):
    a[key] = clone_dict(b[key])
else:
    a[key] = b[key]

其中,clone_dict是:

def clone_dict(obj):
    clone = {}
    for key, value in obj.iteritems():
        if isinstance(value, dict):
            clone[key] = clone_dict(value)
        else:
            clone[key] = value
    return

然而,这显然没有考虑到listset和其他内容,但我希望它能说明在尝试合并dicts时可能会遇到的问题。

为了完整起见,这是我的版本,您可以将多个dicts传递给它:

def merge_dicts(*args):
    def clone_dict(obj):
        clone = {}
        for key, value in obj.iteritems():
            if isinstance(value, dict):
                clone[key] = clone_dict(value)
            else:
                clone[key] = value
        return

    def merge(a, b, path=[]):
        for key in b:
            if key in a:
                if isinstance(a[key], dict) and isinstance(b[key], dict):
                    merge(a[key], b[key], path + [str(key)])
                elif a[key] == b[key]:
                    pass
                else:
                    raise Exception('Conflict at `{path}\''.format(path='.'.join(path + [str(key)])))
            else:
                if isinstance(b[key], dict):
                    a[key] = clone_dict(b[key])
                else:
                    a[key] = b[key]
        return a
    return reduce(merge, args, {})

为什么不使用deepcopy而是使用clone_dict呢?deepcopy - Armando Pérez Marqués
1
因为Python标准库非常庞大而且宏伟!我之前毫不知情——而且编写这个小程序还挺有趣的 :-) - andsens

3
正如其他答案所指出的,递归算法在这里最有意义。一般来说,在使用递归时,最好创建新值而不是尝试修改任何输入数据结构。
我们需要定义每个合并步骤发生的情况。如果两个输入都是字典,那么这很容易:我们从每一侧复制唯一的键,并递归合并重复键的值。问题在于基本情况。如果我们提取一个单独的函数作为占位符,那么逻辑会更加容易理解,我们可以将两个值包装在元组中。
def merge_leaves(x, y):
    return (x, y)

现在我们的逻辑核心看起来像这样:
def merge(x, y):
    if not(isinstance(x, dict) and isinstance(y, dict)):
        return merge_leaves(x, y)
    x_keys, y_keys = x.keys(), y.keys()
    result = { k: merge(x[k], y[k]) for k in x_keys & y_keys }
    result.update({k: x[k] for k in x_keys - y_keys})
    result.update({k: y[k] for k in y_keys - x_keys})
    return result

让我们来测试一下:

>>> x = {'a': {'b': 'c', 'd': 'e'}, 'f': 1, 'g': {'h', 'i'}, 'j': None}
>>> y = {'a': {'d': 'e', 'h': 'i'}, 'f': {'b': 'c'}, 'g': 1, 'k': None}
>>> merge(x, y)
{'f': (1, {'b': 'c'}), 'g': ({'h', 'i'}, 1), 'a': {'d': ('e', 'e'), 'b': 'c', 'h': 'i'}, 'j': None, 'k': None}
>>> x # The originals are unmodified.
{'a': {'b': 'c', 'd': 'e'}, 'f': 1, 'g': {'h', 'i'}, 'j': None}
>>> y
{'a': {'d': 'e', 'h': 'i'}, 'f': {'b': 'c'}, 'g': 1, 'k': None}

我们可以轻松修改叶子合并规则,例如:

def merge_leaves(x, y):
    try:
        return x + y
    except TypeError:
        return Ellipsis

观察效果:

>>> merge(x, y)
{'f': Ellipsis, 'g': Ellipsis, 'a': {'d': 'ee', 'b': 'c', 'h': 'i'}, 'j': None, 'k': None}

我们还可以通过使用第三方库根据输入类型进行分派来潜在地简化这个过程。例如,使用multipledispatch,我们可以做到以下几点:

@dispatch(dict, dict)
def merge(x, y):
    x_keys, y_keys = x.keys(), y.keys()
    result = { k: merge(x[k], y[k]) for k in x_keys & y_keys }
    result.update({k: x[k] for k in x_keys - y_keys})
    result.update({k: y[k] for k in y_keys - x_keys})
    return result

@dispatch(str, str)
def merge(x, y):
    return x + y

@dispatch(tuple, tuple)
def merge(x, y):
    return x + y

@dispatch(list, list)
def merge(x, y):
    return x + y

@dispatch(int, int):
def merge(x, y):
    raise ValueError("integer value conflict")

@dispatch(object, object):
    return (x, y)

这样可以处理各种叶子类型的特殊情况,而无需编写自己的类型检查,并且还可以替换主递归函数中的类型检查。

3

我有一个迭代的解决方案 - 对于大字典及其数量较多(例如json等)效果要好得多:

import collections


def merge_dict_with_subdicts(dict1: dict, dict2: dict) -> dict:
    """
    similar behaviour to builtin dict.update - but knows how to handle nested dicts
    """
    q = collections.deque([(dict1, dict2)])
    while len(q) > 0:
        d1, d2 = q.pop()
        for k, v in d2.items():
            if k in d1 and isinstance(d1[k], dict) and isinstance(v, dict):
                q.append((d1[k], v))
            else:
                d1[k] = v

    return dict1

请注意,如果d1和d2不都是字典,则此操作将使用d2中的值覆盖d1中的值。(与Python的dict.update()相同)
一些测试:
def test_deep_update():
    d = dict()
    merge_dict_with_subdicts(d, {"a": 4})
    assert d == {"a": 4}

    new_dict = {
        "b": {
            "c": {
                "d": 6
            }
        }
    }
    merge_dict_with_subdicts(d, new_dict)
    assert d == {
        "a": 4,
        "b": {
            "c": {
                "d": 6
            }
        }
    }

    new_dict = {
        "a": 3,
        "b": {
            "f": 7
        }
    }
    merge_dict_with_subdicts(d, new_dict)
    assert d == {
        "a": 3,
        "b": {
            "c": {
                "d": 6
            },
            "f": 7
        }
    }

    # test a case where one of the dicts has dict as value and the other has something else
    new_dict = {
        'a': {
            'b': 4
        }
    }
    merge_dict_with_subdicts(d, new_dict)
    assert d['a']['b'] == 4

我已经测试了约1200个字典 - 这种方法只需要0.4秒,而递归解决方案需要约2.5秒。


3
您可以使用来自 toolz 包的 merge 函数,例如:
>>> import toolz
>>> dict1 = {1: {'a': 'A'}, 2: {'b': 'B'}}
>>> dict2 = {2: {'c': 'C'}, 3: {'d': 'D'}}
>>> toolz.merge_with(toolz.merge, dict1, dict2)
{1: {'a': 'A'}, 2: {'c': 'C'}, 3: {'d': 'D'}}

2

由于字典视图支持集合操作,我能够大大简化jterrace的答案。

def merge(dict1, dict2):
    for k in dict1.keys() - dict2.keys():
        yield (k, dict1[k])

    for k in dict2.keys() - dict1.keys():
        yield (k, dict2[k])

    for k in dict1.keys() & dict2.keys():
        yield (k, dict(merge(dict1[k], dict2[k])))

任何试图将字典与非字典(技术上,具有“keys”方法和不具有“keys”方法的对象)结合的尝试都会引发AttributeError。这包括对函数的初始调用和递归调用。这正是我想要的,所以我保留了它。您可以轻松捕获由递归调用抛出的AttributeError,然后产生任何值。

2
这个函数的版本可以考虑 N 个字典,只接受字典类型的参数 -- 如果传入不合适的参数,它会引发 TypeError 异常。合并本身会处理键冲突,而不是覆盖来自合并链下一个字典的数据,它创建了一组值并将其附加到其中;没有数据丢失。
它可能不是页面上最高效的,但是它是最彻底的,当你合并 2 到 N 个字典时,你不会失去任何信息。
def merge_dicts(*dicts):
    if not reduce(lambda x, y: isinstance(y, dict) and x, dicts, True):
        raise TypeError, "Object in *dicts not of type dict"
    if len(dicts) < 2:
        raise ValueError, "Requires 2 or more dict objects"


    def merge(a, b):
        for d in set(a.keys()).union(b.keys()):
            if d in a and d in b:
                if type(a[d]) == type(b[d]):
                    if not isinstance(a[d], dict):
                        ret = list({a[d], b[d]})
                        if len(ret) == 1: ret = ret[0]
                        yield (d, sorted(ret))
                    else:
                        yield (d, dict(merge(a[d], b[d])))
                else:
                    raise TypeError, "Conflicting key:value type assignment"
            elif d in a:
                yield (d, a[d])
            elif d in b:
                yield (d, b[d])
            else:
                raise KeyError

    return reduce(lambda x, y: dict(merge(x, y)), dicts[1:], dicts[0])

print merge_dicts({1:1,2:{1:2}},{1:2,2:{3:1}},{4:4})

输出结果: {1: [1, 2], 2: {1: 2, 3: 1}, 4: 4}


2

以下函数将 b 合并到 a 中。

def mergedicts(a, b):
    for key in b:
        if isinstance(a.get(key), dict) or isinstance(b.get(key), dict):
            mergedicts(a[key], b[key])
        else:
            a[key] = b[key]
    return a

1
我有两个字典(ab),每个字典都可以包含任意数量的嵌套字典。我想要递归地合并它们,以b为优先。
将嵌套字典视为树,我的目标是:
  • 更新a,使b中每个叶子节点的路径在a中表示
  • 如果在b的相应路径中找到一个叶子节点,则覆盖a的子树
    • 保持所有b叶子节点仍为叶子节点的不变量。
现有的答案对我来说有点复杂,并且留下了一些细节。我拼凑出了以下内容,它通过了我的数据集的单元测试。
  def merge_map(a, b):
    if not isinstance(a, dict) or not isinstance(b, dict):
      return b

    for key in b.keys():
      a[key] = merge_map(a[key], b[key]) if key in a else b[key]
    return a

例子(为了清晰起见进行格式化):
 a = {
    1 : {'a': 'red', 
         'b': {'blue': 'fish', 'yellow': 'bear' },
         'c': { 'orange': 'dog'},
    },
    2 : {'d': 'green'},
    3: 'e'
  }

  b = {
    1 : {'b': 'white'},
    2 : {'d': 'black'},
    3: 'e'
  }


  >>> merge_map(a, b)
  {1: {'a': 'red', 
       'b': 'white',
       'c': {'orange': 'dog'},},
   2: {'d': 'black'},
   3: 'e'}

需要维护的b中的路径为:

  • 1 -> 'b' -> 'white'
  • 2 -> 'd' -> 'black'
  • 3 -> 'e'.

a具有唯一且不冲突的路径:

  • 1 -> 'a' -> 'red'
  • 1 -> 'c' -> 'orange' -> 'dog'

因此它们仍在合并后的映射中表示。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接