如何在使用Pyspark的AWS Glue中扁平化嵌套JSON中的数组?

3
我正在尝试将JSON文件压平,以便可以在AWS Glue中加载到PostgreSQL中。我使用PySpark进行操作。使用爬虫,我爬取S3 JSON并生成一个表。然后,我使用ETL Glue脚本来:

  • 读取爬取的表
  • 使用“ Relationalize”功能来压平文件
  • 将动态框架转换为数据帧
  • 尝试“ Explode”请求数据字段

到目前为止的脚本:

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = glue_source_database, table_name = glue_source_table, transformation_ctx = "datasource0")

df0 = Relationalize.apply(frame = datasource0, staging_path = glue_temp_storage, name = dfc_root_table_name, transformation_ctx = "dfc")

df1 = df0.select(dfc_root_table_name)

df2 = df1.toDF()

df2 = df1.select(explode(col('`request.data`')).alias("request_data"))

<then i write df1 to a PostgreSQL database which works fine>

我面临的问题:

“关系化”函数很好用,但请求数据字段变成了bigint,因此“explode”无法使用。

由于数据结构的原因,不能在不先对JSON使用“关系化”函数的情况下执行“explode”。具体错误是:“org.apache.spark.sql.AnalysisException:由于数据类型不匹配,无法解析'explode(request.data)':传递给函数explode的输入应该是数组或映射类型,而不是bigint”

如果我首先将动态框架转换为数据框,则会遇到以下问题:“py4j.protocol.Py4JJavaError:调用o72.jdbc时发生错误。:java.lang.IllegalArgumentException:无法获取struct的JDBC类型…”

我尝试上传分类器,以使数据在抓取过程中被平铺,但AWS证实这不起作用。

我正在尝试规范化的原始文件的JSON格式如下:

- field1
- field2
- {}
  - field3
  - {}
    - field4
    - field5
  - []
    - {}
      - field6
      - {}
        - field7
        - field8
        - {}
          - field9
          - {}
            - field10
2个回答

2
# Flatten nested df  
def flatten_df(nested_df): 
    for col in nested_df.columns:


    array_cols = [c[0] for c in nested_df.dtypes if c[1][:5] == 'array']
    for col in array_cols:
        nested_df =nested_df.withColumn(col, F.explode_outer(nested_df[col]))

    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
    if len(nested_cols) == 0:
        return nested_df

    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']

    flat_df = nested_df.select(flat_cols +
                            [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])

    return flatten_df(flat_df)

df=flatten_df(df)

它将用下划线替换所有的点。请注意,它使用explode_outer而不是explode,以包含空值,以防数组本身为null。此函数仅适用于spark v2.4+

还要记住,展开数组将添加更多的重复数据,总行大小将增加。展平结构将增加列大小。简而言之,原始df将水平和垂直地爆炸。这可能会降低后续处理数据的速度。

因此,我的建议是识别与特征相关的数据,并仅将这些数据存储在postgresql中,将原始json文件存储在s3中。


这对于我的大多数JSON文件都有效。但是当结构/数组为NULL时,我会收到一个错误消息:“在list.ordered中没有这样的结构字段style。” 我该如何处理空条件? - Lisa Mathew
可以使用posexplode_outer函数代替explode_outer吗? - Lisa Mathew
不错。第一行 for col in nested_df.columns: 需要被删除。 - Vinay Kolar
这将在数据框中创建重复记录。 - 123

0

一旦您对json列进行了合理化处理,就不需要将其拆分。关系化转换嵌套的JSON为JSON文档最外层的键值对。转换后的数据保留了由点号分隔的嵌套JSON中原始键的列表。

示例:

嵌套的JSON:

{
    "player": {
        "username": "user1",
        "characteristics": {
            "race": "Human",
            "class": "Warlock",
            "subclass": "Dawnblade",
            "power": 300,
            "playercountry": "USA"
        },
        "arsenal": {
            "kinetic": {
                "name": "Sweet Business",
                "type": "Auto Rifle",
                "power": 300,
                "element": "Kinetic"
            },
            "energy": {
                "name": "MIDA Mini-Tool",
                "type": "Submachine Gun",
                "power": 300,
                "element": "Solar"
            },
            "power": {
                "name": "Play of the Game",
                "type": "Grenade Launcher",
                "power": 300,
                "element": "Arc"
            }
        },
        "armor": {
            "head": "Eye of Another World",
            "arms": "Philomath Gloves",
            "chest": "Philomath Robes",
            "leg": "Philomath Boots",
            "classitem": "Philomath Bond"
        },
        "location": {
            "map": "Titan",
            "waypoint": "The Rig"
        }
    }
}

在整理后展开的 JSON:

{
    "player.username": "user1",
    "player.characteristics.race": "Human",
    "player.characteristics.class": "Warlock",
    "player.characteristics.subclass": "Dawnblade",
    "player.characteristics.power": 300,
    "player.characteristics.playercountry": "USA",
    "player.arsenal.kinetic.name": "Sweet Business",
    "player.arsenal.kinetic.type": "Auto Rifle",
    "player.arsenal.kinetic.power": 300,
    "player.arsenal.kinetic.element": "Kinetic",
    "player.arsenal.energy.name": "MIDA Mini-Tool",
    "player.arsenal.energy.type": "Submachine Gun",
    "player.arsenal.energy.power": 300,
    "player.arsenal.energy.element": "Solar",
    "player.arsenal.power.name": "Play of the Game",
    "player.arsenal.power.type": "Grenade Launcher",
    "player.arsenal.power.power": 300,
    "player.arsenal.power.element": "Arc",
    "player.armor.head": "Eye of Another World",
    "player.armor.arms": "Philomath Gloves",
    "player.armor.chest": "Philomath Robes",
    "player.armor.leg": "Philomath Boots",
    "player.armor.classitem": "Philomath Bond",
    "player.location.map": "Titan",
    "player.location.waypoint": "The Rig"
}

因此,在您的情况下,request.data已经是从请求列展开的新列,并且其类型被Spark解释为bigint。
参考:使用AWS Glue关系转换简化/查询嵌套JSON

真的,但问题在于JSON结构(request.data)中有一个数组需要展开。否则,它只会返回1的bigint(即省略实际数据),这是不正确的。否则rationalize工作得很好。 - charlesperry
@charlesperry,你说得对。Relationalize 只适用于 JSON 的最外层,这应该在文档中明确说明。我仍在努力找出将具有 5 层嵌套数组和结构的 JSON 文件关联化的最佳方法。 - ruifgmonteiro
@ruifgmonteiro,你解决了这个问题吗?我们正在尝试处理具有嵌套数组的对象的有理化。 - Sigex
@Sigex 我们最终采用了基于Spark SQL的不同方法,其中我们创建一个基于预期模式的表,然后使用SQL来应用所需的转换。 这一方法运行良好,比起我们最初的方法,该方法要简单得多,因为我们不需要通过层次结构递归地应用转换。 总之,由于缺乏文档,我们决定不再投入更多精力来使用Glue库,并且我们认为使用Spark库更有益,因为我们可以轻松地将其移植到另一个平台而不必彻底更改代码。 - ruifgmonteiro

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接