将嵌套的MongoDB导入到Pandas

5
我有一个 MongoDB 中嵌套层次较深的文档集合,想要将其展开并导入到 Pandas 中。其中有一些嵌套字典,但也有一组字典列表,我希望将其转换为列(详见下面的示例)。
我已经有一个用于较小批量文档的函数。但是解决方案(我在这个问题的回答中找到它)使用了 json。 json.loads 操作的问题是,在从 Collection 中选择更多数据时会因为 MemoryError 而失败。
我尝试过许多建议使用其他 json 解析器(例如 ijson)的解决方案,但出于不同的原因,它们都没有解决我的问题。如果我想通过 json 保持转换,唯一剩下的方法就是将较大的选择分成更小的文档组,并迭代解析。
此时我想,也就是我主要的问题——也许有一种更聪明的方法来完成展开而不需要通过 json 直接在 MongoDB 或 Pandas 中进行操作或以某种组合方式?
这是缩短后的示例 Doc:
{
  '_id': ObjectId('5b40fcc4affb061b8871cbc5'),
  'eventId': 2,
  'sId' : 6833,
  'stage': {
    'value': 1,
    'Name': 'FirstStage'
  },
  'quality': [
    {
      'type': {
        'value': 2,
        'Name': 'Color'
      },
      'value': '124'
    },
    {
      'type': {
        'value': 7,
        'Name': 'Length'
      },
      'value': 'Short'
    },
    {
      'type': {
        'value': 15,
        'Name': 'Printed'
      }
    }
}

这是一个成功的数据框架表示方式(为了易读性,我跳过了“_id”和“sId”列):
    eventId    stage.value    stage.name    q_color    q_length    q_printed
1   2          1              'FirstStage'  124        'Short'     1 

目前我的代码(遇到了内存问题-请参阅上文):

def load_events(filter = 'sId', id = 6833, all = False):
  if all:
    print('Loading all events.')
    cursor = events.find()
  else:
    print('Loading events with %s equal to %s.' %(filter, id))
    print('Filtering...')
    cursor = events.find({filter : id})

  print('Loading...')
  l = list(cursor)

  print('Parsing json...')
  sanitized = json.loads(json_util.dumps(l))

  print('Parsing quality...')
  for ev in sanitized:
    for q in ev['quality']:
        name = 'q_' + str(q['type']['Name'])
        value = q.pop('value', 1)
        ev[name] = value
    ev.pop('quality',None)

  normalized = json_normalize(sanitized)

  df = pd.DataFrame(normalized)

  return df

变量 events 未定义。 - Dr Fabio Gori
1个回答

4
您不需要使用json解析器将嵌套结构转换。只需从记录列表创建您的数据帧即可:
df = DataFrame(list(cursor))

然后使用pandas来解压缩您的列表和字典:

import pandas
from itertools import chain
import numpy

df = pandas.DataFrame(t)
df['stage.value'] = df['stage'].apply(lambda cell: cell['value'])
df['stage.name'] = df['stage'].apply(lambda cell: cell['Name'])
df['q_']= df['quality'].apply(lambda cell: [(m['type']['Name'], m['value'] if 'value' in m.keys() else 1) for m in cell])
df['q_'] = df['q_'].apply(lambda cell: dict((k, v) for k, v in cell))
keys = set(chain(*df['q_'].apply(lambda column: column.keys())))
for key in keys:
    column_name = 'q_{}'.format(key).lower()
    df[column_name] = df['q_'].apply(lambda cell: cell[key] if key in cell.keys() else numpy.NaN) 
df.drop(['stage', 'quality', 'q_'], axis=1, inplace=True)

我使用三个步骤来拆解嵌套数据类型。首先,使用名称和值创建一组平铺的元组列表。第二步是基于元组创建字典,使用元组中的第一个位置作为键,第二个位置作为值。然后,使用集合提取所有现有属性名。每个属性都通过循环获得一个新列。在循环内部,将每对的值映射到相应的列单元格。


谢谢,这差不多是我在寻找的。但还有一个问题,它在上面显示的数据中并不明显。在“quality”下有很多可能的条目(大约100个)。文档总是只有0-10个小选择。如果文档中没有“q_”,则应在相应列的相应字段中出现NaN。 所以1)我想知道,您的解决方案是否会抛出KeyError? 2)我可以硬编码每一个,但那似乎是很多工作,也不太优雅。 - J_Scholz
PS:由于这似乎是一个全新的角度,我将开启一个后续问题,并将您的答案标记为已接受。谢谢! - J_Scholz
我更新了我的回答。现在它应该更适合你的需求。也许使用Pymongo查询来过滤所有键会更快。 - Viktor
哦,是的,它能工作了,非常感谢!我还尝试了 Pymongo 查询,但使用您的“链式魔法”要快得多。;-) - J_Scholz
变量t未定义。 - Dr Fabio Gori

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接