如何将多层/嵌套的JSON数据扁平化?

42

我正在尝试将JSON转换为CSV文件,以便进行进一步的分析。我的结构问题在于,当我转换我的JSON文件时,它有相当多嵌套的字典/列表。

我试过使用pandas的json_normalize(),但它只能展平第一级。

import json
import pandas as pd
from pandas.io.json import json_normalize
from cs import CloudStack

api_key = xxxx
secret = xxxx
endpoint = xxxx

cs = CloudStack(endpoint=endpoint,
                key=api_key,
                secret=secret)

virtual_machines = cs.virtMach()

test = json_normalize(virtual_machines["virtualmachine"])

test.to_csv("test.csv", sep="|", index=False)

有没有想法可以将整个JSON文件扁平化,这样我就可以为单个条目(在这种情况下是虚拟机)创建单行输入到CSV文件中?我尝试了这里发布的几个解决方案,但我的结果总是只有第一级被扁平化。

这是示例JSON(在这种情况下,我仍然得到“securitygroup”和“nic”输出为JSON格式:

{
    "count": 13,
    "virtualmachine": [
        {
            "id": "1082e2ed-ff66-40b1-a41b-26061afd4a0b",
            "name": "test-2",
            "displayname": "test-2",
            "securitygroup": [
                {
                    "id": "9e649fbc-3e64-4395-9629-5e1215b34e58",
                    "name": "test",
                    "tags": []
                }
            ],
            "nic": [
                {
                    "id": "79568b14-b377-4d4f-b024-87dc22492b8e",
                    "networkid": "05c0e278-7ab4-4a6d-aa9c-3158620b6471"
                },
                {
                    "id": "3d7f2818-1f19-46e7-aa98-956526c5b1ad",
                    "networkid": "b4648cfd-0795-43fc-9e50-6ee9ddefc5bd"
                    "traffictype": "Guest"
                }
            ],
            "hypervisor": "KVM",
            "affinitygroup": [],
            "isdynamicallyscalable": false
        }
    ]
}

3
这里有很好的例子(链接在此)。那篇提到的“flatten json”函数应该正好符合您的需求。如果这有帮助,请告诉我。 - gyx-hh
1
你好,这个链接确实非常有帮助。虽然只是部分解决了我的问题,但现在所有的东西都被展平了,而不仅仅是内部字典。但我在那里找到了完全相同的问题,这导致了json_normalization()文档的出现,该文档显示您可以指定导出的深度。链接 - Bostjan
是的,json_normalize非常有用!试一试并告诉我们效果如何。 - gyx-hh
所以基本上这个方法可行 - 我使用了json_normalization(),在其中定义了输出结构,就像上面链接中提到的那样。再次感谢gyx提供的所有帮助。您能否将其发布为答案,这样我就可以将其标记为解决方案吗? - Bostjan
太好了。你现在可以发布自己的答案(因为你已经回答了它),并将其标记为答案 :) - gyx-hh
在“traffictype”之前需要添加逗号(,)。 - Nikhil VJ
10个回答

64

我使用了以下函数(详细信息可以在此处找到:这里):

def flatten_data(y):
    out = {}

    def flatten(x, name=''):
        if type(x) is dict:
            for a in x:
                flatten(x[a], name + a + '_')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '_')
                i += 1
        else:
            out[name[:-1]] = x

    flatten(y)
    return out

很遗憾,这将完全展平整个JSON,这意味着,如果您有多级JSON(许多嵌套的字典),它可能会将所有内容展平为单行,并具有大量列。

最终我使用的是 json_normalize() 并指定了所需的结构。如何以这种方式完成的良好示例可以在这里找到。


有没有办法避免将数组在最后一级展平? - ScipioAfricanus

16
https://dev59.com/fm025IYBdhLWcg3wZlLd#62186053跨帖(但需要进一步适应):在这个repo中:https://github.com/ScriptSmith/socialreaper/blob/master/socialreaper/tools.py#L8,我找到了一个实现@roneo的列表包含评论@Imran发布的答案的方法。
我添加了检查以捕获空列表和空字典。并且还添加了打印行,这将有助于理解此函数的工作原理。您可以通过设置crumbs=False关闭这些打印语句。
from collections import MutableMapping
crumbs = True
def flatten(dictionary, parent_key=False, separator='.'):
    """
    Turn a nested dictionary into a flattened dictionary
    :param dictionary: The dictionary to flatten
    :param parent_key: The string to prepend to dictionary's keys
    :param separator: The string used to separate flattened keys
    :return: A flattened dictionary
    """

    items = []
    for key, value in dictionary.items():
        if crumbs: print('checking:',key)
        new_key = str(parent_key) + separator + key if parent_key else key
        if isinstance(value, MutableMapping):
            if crumbs: print(new_key,': dict found')
            if not value.items():
                if crumbs: print('Adding key-value pair:',new_key,None)
                items.append((new_key,None))
            else:
                items.extend(flatten(value, new_key, separator).items())
        elif isinstance(value, list):
            if crumbs: print(new_key,': list found')
            if len(value):
                for k, v in enumerate(value):
                    items.extend(flatten({str(k): v}, new_key, separator).items())
            else:
                if crumbs: print('Adding key-value pair:',new_key,None)
                items.append((new_key,None))
        else:
            if crumbs: print('Adding key-value pair:',new_key,value)
            items.append((new_key, value))
    return dict(items)

测试它:

ans = flatten({'a': 1, 'c': {'a': 2, 'b': {'x': 5, 'y' : 10}}, 'd': [1, 2, 3], 'e':{'f':[], 'g':{}} })
print('\nflattened:',ans)

输出:

checking: a
Adding key-value pair: a 1
checking: c
c : dict found
checking: a
Adding key-value pair: c.a 2
checking: b
c.b : dict found
checking: x
Adding key-value pair: c.b.x 5
checking: y
Adding key-value pair: c.b.y 10
checking: d
d : list found
checking: 0
Adding key-value pair: d.0 1
checking: 1
Adding key-value pair: d.1 2
checking: 2
Adding key-value pair: d.2 3
checking: e
e : dict found
checking: f
e.f : list found
Adding key-value pair: e.f None
checking: g
e.g : dict found
Adding key-value pair: e.g None

flattened: {'a': 1, 'c.a': 2, 'c.b.x': 5, 'c.b.y': 10, 'd.0': 1, 'd.1': 2, 'd.2': 3, 'e.f': None, 'e.g': None}

这就是我需要完成的任务:我将任何复杂的json传递给它,它会为我压平。我还添加了一个检查原始代码以处理空列表。

感谢https://github.com/ScriptSmith的仓库,我在其中找到了最初的压平函数。

测试OP的示例json,以下是输出:

{'count': 13,
 'virtualmachine.0.id': '1082e2ed-ff66-40b1-a41b-26061afd4a0b',
 'virtualmachine.0.name': 'test-2',
 'virtualmachine.0.displayname': 'test-2',
 'virtualmachine.0.securitygroup.0.id': '9e649fbc-3e64-4395-9629-5e1215b34e58',
 'virtualmachine.0.securitygroup.0.name': 'test',
 'virtualmachine.0.securitygroup.0.tags': None,
 'virtualmachine.0.nic.0.id': '79568b14-b377-4d4f-b024-87dc22492b8e',
 'virtualmachine.0.nic.0.networkid': '05c0e278-7ab4-4a6d-aa9c-3158620b6471',
 'virtualmachine.0.nic.1.id': '3d7f2818-1f19-46e7-aa98-956526c5b1ad',
 'virtualmachine.0.nic.1.networkid': 'b4648cfd-0795-43fc-9e50-6ee9ddefc5bd',
 'virtualmachine.0.nic.1.traffictype': 'Guest',
 'virtualmachine.0.hypervisor': 'KVM',
 'virtualmachine.0.affinitygroup': None,
 'virtualmachine.0.isdynamicallyscalable': False}

所以你会看到'tags'和'affinitygroup'键也被处理并添加到输出中。原始代码忽略了它们。

2021-05-30:更新:collections.MutableMapping已更改为collections.abc.MutableMapping

2023-01-11:编辑,根据@MHebes的建议,在第二个items.extend()调用中添加了分隔符参数。


需要将items.extend(flatten({str(k): v}, new_key).items())中的seperator参数进行转发。 - MHebes
@MHebes 抱歉,我没听懂你的意思。你能给出修改后的代码行吗?否则,请随意提交另一个带有改进代码的答案。 - Nikhil VJ
当然,修复方法是 items.extend(flatten({str(k): v}, new_key, separator).items())。这确保了 separator 参数传递到嵌套级别。您在 if ... MutableMapping 分支中执行此操作,但未在 elif ... list 分支中执行。 - MHebes
1
@MHebes 进行了更正。 - Nikhil VJ

14

IMO 接受的回答 没有正确处理JSON数组。

如果JSON对象的值是数组,则应将其展平为对象数组,例如

{'a': [1, 2]} -> [{'a': 1}, {'a': 2}]

不要将索引添加到键中。

嵌套对象应该通过连接键(例如使用句点作为分隔符)来展平。

{'a': {'b': 1}} -> {'a.b': 1}

(在被接受的一种方法中)这样做是正确的。

考虑到所有这些要求,我最终得出了以下结果(开发和使用于CPython3.5.3):

from functools import (partial,
                       singledispatch)
from itertools import chain
from typing import (Dict,
                    List,
                    TypeVar)

Serializable = TypeVar('Serializable', None, int, bool, float, str,
                       dict, list, tuple)
Array = List[Serializable]
Object = Dict[str, Serializable]


def flatten(object_: Object,
            *,
            path_separator: str = '.') -> Array[Object]:
    """
    Flattens given JSON object into list of objects with non-nested values.

    >>> flatten({'a': 1})
    [{'a': 1}]
    >>> flatten({'a': [1, 2]})
    [{'a': 1}, {'a': 2}]
    >>> flatten({'a': {'b': None}})
    [{'a.b': None}]
    >>> flatten({'a': [1, 2], 'b': []})
    [{'a': 1}, {'a': 2}]
    """
    keys = set(object_)
    result = [dict(object_)]
    while keys:
        key = keys.pop()
        new_result = []
        for index, record in enumerate(result):
            try:
                value = record[key]
            except KeyError:
                new_result.append(record)
            else:
                if isinstance(value, dict):
                    del record[key]
                    new_value = flatten_nested_objects(
                            value,
                            prefix=key + path_separator,
                            path_separator=path_separator
                    )
                    keys.update(new_value.keys())
                    new_result.append({**new_value, **record})
                elif isinstance(value, list):
                    del record[key]
                    new_records = [
                        flatten_nested_objects(sub_value,
                                               prefix=key + path_separator,
                                               path_separator=path_separator)
                        for sub_value in value
                    ]
                    keys.update(chain.from_iterable(map(dict.keys,
                                                        new_records)))
                    if new_records:
                        new_result.extend({**new_record, **record}
                                          for new_record in new_records)
                    else:
                        new_result.append(record)
                else:
                    new_result.append(record)
        result = new_result
    return result


@singledispatch
def flatten_nested_objects(object_: Serializable,
                           *,
                           prefix: str = '',
                           path_separator: str) -> Object:
    return {prefix[:-len(path_separator)]: object_}


@flatten_nested_objects.register(dict)
def _(object_: Object,
      *,
      prefix: str = '',
      path_separator: str) -> Object:
    result = dict(object_)
    for key in list(result):
        result.update(flatten_nested_objects(result.pop(key),
                                             prefix=(prefix + key
                                                     + path_separator),
                                             path_separator=path_separator))
    return result


@flatten_nested_objects.register(list)
def _(object_: Array,
      *,
      prefix: str = '',
      path_separator: str) -> Object:
    return {prefix[:-len(path_separator)]: list(map(partial(
            flatten_nested_objects,
            path_separator=path_separator),
            object_))}

1
如果我在一个嵌套对象上调用flatten函数,我不会再期望任何子元素存在。我会期望一个单一的顶层可迭代对象,而不是一个可能包含子结构的可迭代对象。我认为你的实现返回一个扁平化项列表,但不能说返回值是扁平的(除非你提供一个已经扁平化的列表,例如:[1,2,3,4,5])。 - acdameli
@abdelgha4:它如何“不能处理JSON对象值为空数组的情况”?您能否提供一个带有预期和实际行为示例的例子? - Azat Ibrakov
1
例如 flatten({'a': [1, 2], 'b': []}) 返回 [],但我期望得到 [{'a': 1}, {'a': 2}] 就像 flatten({'a': [1, 2], 'b': {}}) 的结果一样。 - abdelgha4
@abdelgha4:谢谢,发现得好,希望额外的分支能解决这个问题。 - Azat Ibrakov
我不知道你是否有时间,但我认为这些将是一些很棒的功能:
  • 控制展平的级别。也许可以使用 max_depthkeys_not_to_flatten 参数。
    flatten({'a': [1, 2], 'b': [3, 4], 'c': {'d': [5, 6]}}, max_depth=1, keys_not_to_flatten=['b']) -> [{'a': 1, 'c': "{'d': [5, 6]}", 'b': '[3, 4]'}, {'a': 2, 'c': "{'d': [5, 6]}", 'b': '[3, 4]'}]
  • 布尔参数 split_arrays,用于在当前行为(拆分数组)和将索引添加到键之间切换。
我希望我能够处理这个问题,但对我来说有点高级了 :).
- abdelgha4
显示剩余4条评论

5

如果有其他人也在这里寻找更适合后续编程处理的解决方案:

将列表展平会导致需要处理标题以获取列表长度等信息。我想要一个解决方案,如果有两个列表,每个列表有2个元素,则会生成四行数据,产生每个有效潜在数据行(有关实际示例,请参见下文):

class MapFlattener:

    def __init__(self):
        self.headings = []
        self.rows = []

    def add_rows(self, headings, rows):
        self.headings = [*self.headings, *headings]
        if self.rows:
            new_rows = []
            for base_row in self.rows:
                for row in rows:
                    new_rows.append([*base_row, *row])
            self.rows = new_rows
        else:
            self.rows = rows

    def __call__(self, mapping):
        for heading, value in mapping.items():
            if isinstance(value, Mapping):
                sub_headings, sub_rows = MapFlattener()(value)
                sub_headings = [f'{heading}:{sub_heading}' for sub_heading in sub_headings]
                self.add_rows(sub_headings, sub_rows)
                continue

            if isinstance(value, list):
                self.add_rows([heading], [[e] for e in value])
                continue

            self.add_rows([heading], [[value]])

        return self.headings, self.rows


def map_flatten(mapping):
    return MapFlattener()(mapping)

这样可以创建更符合关系型数据的输出:
In [22]: map_flatten({'l': [1,2]})                                                                                                          
Out[22]: (['l'], [[1], [2]])

In [23]: map_flatten({'l': [1,2], 'n': 7})                                                                                                  
Out[23]: (['l', 'n'], [[1, 7], [2, 7]])

In [24]: map_flatten({'l': [1,2], 'n': 7, 'o': {'a': 1, 'b': 2}})                                                                           
Out[24]: (['l', 'n', 'o:a', 'o:b'], [[1, 7, 1, 2], [2, 7, 1, 2]])

In [25]: map_flatten({'top': {'middle': {'bottom': [0, 1]}, 'ml': ['A', 'B']}, 'l': ['a', 'b']})                                             
Out[25]: 
(['top:middle:bottom', 'top:ml', 'l'],
 [[0, 'A', 'a'],
  [0, 'A', 'b'],
  [0, 'B', 'a'],
  [0, 'B', 'b'],
  [1, 'A', 'a'],
  [1, 'A', 'b'],
  [1, 'B', 'a'],
  [1, 'B', 'b']])

如果您在电子表格等工具中使用CSV并需要处理扁平数据,则此功能特别有用。


我们如何将其扩展到多个级别? - Hemanth
它是递归的,已经可以处理多个级别。我已经编辑了答案并包含了一个示例。 - Paul Whipp

1

我尝试使用BFS方法,只有在val是字典类型时才将(parent,val)存储在队列中。

def flattern_json(d):
    if len(d) == 0:
        return {}
    from collections import deque
    q = deque()
    res = dict()
    for key, val in d.items(): # This loop push the top most keys and values into queue.
        if not isinstance(val, dict):  # If it's not dict
            if isinstance(val, list):  # If it's list then check list values if it contains dict object.
                temp = list()  # Creating temp list for storing the values that we will need which are not dict.
                for v in val:
                    if not isinstance(v, dict):
                        temp.append(v)
                    else:
                        q.append((key, v))  # if it's value is dict type then we push along with parent which is key.
                if len(temp) > 0:
                    res[key] = temp
            else:
                res[key] = val
        else:
            q.append((key, val))
    while q:
        k, v = q.popleft()  # Taking parent and the value out of queue
        for key, val in v.items():
            new_parent = k + "_" + key  # New parent will be old parent_currentval
            if isinstance(val, list):
                temp = list()
                for v in val:
                    if not isinstance(v, dict):
                        temp.append(v)
                    else:
                        q.append((new_parent, v))
                if len(temp) >= 0:
                    res[new_parent] = temp
            elif not isinstance(val, dict):
                res[new_parent] = val
            else:
                q.append((new_parent, val))
    return res

使用给定的JSON,它正在工作,我添加了下划线以展开JSON,而不是使用0 1列表索引。
from pprint import pprint

print(pprint.pprint(flattern_json(d)))

它给出了以下输出:


{'count': 13,
 'virtualmachine_affinitygroup': [],
 'virtualmachine_displayname': 'test-2',
 'virtualmachine_hypervisor': 'KVM',
 'virtualmachine_id': '1082e2ed-ff66-40b1-a41b-26061afd4a0b',
 'virtualmachine_isdynamicallyscalable': False,
 'virtualmachine_name': 'test-2',
 'virtualmachine_nic': [],
 'virtualmachine_nic_id': '3d7f2818-1f19-46e7-aa98-956526c5b1ad',
 'virtualmachine_nic_networkid': 'b4648cfd-0795-43fc-9e50-6ee9ddefc5bd',
 'virtualmachine_nic_traffictype': 'Guest',
 'virtualmachine_securitygroup': [],
 'virtualmachine_securitygroup_id': '9e649fbc-3e64-4395-9629-5e1215b34e58',
 'virtualmachine_securitygroup_name': 'test',
 'virtualmachine_securitygroup_tags': []}

0

我已经阅读了大量不同的方法。这是唯一一个对于复杂嵌套的JSON有效的方法。这个方法是将嵌套的JSON展开并转换为pandas数据框,以便更容易地过滤出您想要的任何元素。

import json
import pprint
import pandas as pd
from flatten_json import flatten

with open('sth.json') as json_file:
  nested_json = json.load(json_file)
nested_json = nested_json["_via_img_metadata"]

out = {}
def flatten(x, name=''):
    if type(x) is dict:
        for a in x:
            flatten(x[a], name + a + '_')
    elif type(x) is list:
        i = 0
        for a in x:
            flatten(a, name + str(i) + '_')
            i += 1
    else:
        out[name[:-1]] = x
    return out

df = pd.Series(flatten(nested_json)).to_frame()

0
def flatten(data, key=None, *, seperator="__"):
    if isinstance(data, list):
        for item in data:
            yield from flatten(item, key)
    elif isinstance(data, dict):
        for k, v in data.items():
            if key:
                new_key = key + seperator + k
                yield from flatten(v, new_key)
            else:
                yield from flatten(v, k) 
    else:
        yield key, data

使用方法:

data = {
    "age": 29,
    "name": "ankit",
    "others": [
        {
            "todo": [{"title": "party"}, {"title": "buy grocceries"}],
            "hobbies": ["gym", "online games"],
        }
    ],
    "skills": ["javascript", "react"],
}

list(flatten(data))
output:
[('age', 29),
 ('name', 'ankit'),
 ('others__todo__title', 'party'),
 ('others__todo__title', 'buy grocceries'),
 ('others__hobbies', 'gym'),
 ('others__hobbies', 'online games'),
 ('skills', 'javascript'),
 ('skills', 'react')]

0
我使用这个简单的函数将数据规范化并转换为JSON格式。它接受列表、字典和元组,并将其扁平化为JSON。
def normalize_data_to_json(raw_data: [list, dict, tuple], parent=""):
    from datetime import datetime
    from decimal import Decimal

    result = {}
    # key name normalise to snake case (single underscore)
    parent = parent.lower().replace(" ", "_") if isinstance(parent, str) else parent
    if isinstance(parent, str) and parent.startswith("__"):
        # if parent has no parent remove double underscore and treat as int if digit else as str
        # treating as int is better if passed data is a list so you output is index based dict
        parent = int(parent.lstrip("_")) if parent.lstrip("_").isdigit() else parent.lstrip("_")

    # handle str, int, float, and decimal.
    # you can easily add more data types as er your data
    if type(raw_data) in [str, int, float, Decimal]:
        result[parent] = float(raw_data) if isinstance(raw_data, Decimal) else raw_data

    # normalise datetime object
    elif isinstance(raw_data, datetime):
        result[parent] = raw_data.strftime("%Y-%m-%d %H:%M:%S")

    # normalise dict and all nested dicts.
    # all nests are joined with double underscore to identify parent key name with it's children
    elif isinstance(raw_data, dict):
        for k, v in raw_data.items():
            k = f'{parent}__{k}' if parent else k
            result.update(normalize_data_to_json(v, parent=k))

    # normalise list and tuple
    elif type(raw_data) in [list, tuple]:
        for i, sub_item in enumerate(raw_data, start=1):
            result.update(normalize_data_to_json(sub_item, f"{parent}__{i}"))

    # any data which did not matched above data types, normalise them using it's __str__
    else:
        result[parent] = str(raw_data)

    return result

-1

以jsonpath格式输出:

def convert(f):
    out = {}
    def flatten(x, name=None):
        if type(x) is dict:
            for a in x:
                val = '.'.join((name, a)) if name else a
                flatten(x[a], val)
        elif type(x) is list:
            for (i, a) in enumerate(x):
                flatten(a, name + f'[{str(i)}]')
        else:
            out[name] = x if x else ""
    flatten(f)
    return out

-2

在这里传递你的字典:

def getKeyValuePair(dic,master_dic = {},master_key = None):
    keys = list(dic.keys())
    for key in keys:
        if type(dic[key]) == dict:
                getKeyValuePair(dic[key],master_dic = master_dic,master_key = key)
        else:
            if master_key == None:
                master_dic[key] = dic[key]
            else:
                master_dic[str(master_key)+'_'+str(key)] = dic[key]

   return master_dic

3
请修正您的缩进,并使代码在通过某些字典后能够正常工作。 - MD Rijwan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接