将JSON字符串列拆分为多个列

6
我正在寻找一种通用的解决方案,以从JSON字符串列中提取所有的JSON字段作为列。
df =  spark.read.load(path)
df.show()

'path'路径下的文件格式为parquet。

样例数据

|id | json_data
| 1 | {"name":"abc", "depts":["dep01", "dep02"]}
| 2 | {"name":"xyz", "depts":["dep03"],"sal":100}
| 3 | {"name":"pqr", "depts":["dep02"], "address":{"city":"SF","state":"CA"}}

期望输出结果

|id | name    | depts              | sal | address_city | address_state
| 1 | "abc"   | ["dep01", "dep02"] | null| null         | null
| 2 | "xyz"   | ["dep03"]          | 100 | null         | null
| 3 | "pqr"   | ["dep02"]          | null| "SF"         | "CA"

我知道可以通过创建已定义模式的StructType并使用'from_json'方法来提取列。但是这种方法需要手动定义模式。
val myStruct = StructType(
  Seq(
    StructField("name", StringType),
    StructField("depts", ArrayType(StringType)),
    StructField("sal", IntegerType)
  ))

var newDf = df.withColumn("depts", from_json(col("depts"), myStruct))

有没有更好的方法在不手动定义架构的情况下展开JSON列? 在所提供的示例中,我可以看到可用的JSON字段。 但实际上,我无法遍历所有行以查找所有字段。

因此,我正在寻找一种解决方案,可以将所有字段拆分为列,而无需指定列的名称或类型。


如果数据是纯的JSON多行,则可以自动实现模式。 - Lamanus
它是一个纯的 JSON 列,但并非所有行都有所有字段。就像我示例中的第一行缺少“sal”字段。 - Munesh
3个回答

3
如果是一个CSV文件,且仅有一列数据是JSON格式,您可以使用以下解决方案。
val csvDF = spark.read.option("delimiter", "|").option("inferSchema", true).option("header", true).csv("test.csv")
val rdd = csvDF.select(" json_data").rdd.map(_.getString(0))
val ds = rdd.toDS
val jsonDF = spark.read.json(ds)
val jsonDFWithID = jsonDF.withColumn("id", monotonically_increasing_id())
val csvDFWithID = csvDF.select($"id ").withColumn("id", monotonically_increasing_id())
val joinDF = jsonDFWithID.join(csvDFWithID, "id").drop("id")

这是最终数据框的样子。
scala> joinDF.printSchema()
root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- state: string (nullable = true)
 |-- depts: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- name: string (nullable = true)
 |-- sal: long (nullable = true)
 |-- id : double (nullable = true)

如果是一个JSON文件,下面的解决方案将会有效。对于我来说,inferSchema非常完美地运作。

JSON文件

~/Downloads ▶ cat test.json
{"id": 1, "name":"abc", "depts":["dep01", "dep02"]},
{"id": 2, "name":"xyz", "depts" :["dep03"],"sal":100}

代码

scala> scc.read.format("json").option("inerSchema", true).load("Downloads/test.json").show()
+--------------+---+----+----+
|         depts| id|name| sal|
+--------------+---+----+----+
|[dep01, dep02]|  1| abc|null|
|       [dep03]|  2| xyz| 100|
+--------------+---+----+----+

我的数据/文件格式不是JSON。只有一个列是JSON字符串。 - Munesh
1
@Munesh 我已经修改了答案。这是你想要的吗? - Gaurang Shah
谢谢Gaurang Shah。你的方法帮助了我解决问题。这种方法在嵌套的JSON中不起作用,而monotonically_increasing_id也不起作用,因为它不是连续的,因此连接只返回前几行。 - Munesh

1
假设json_datamap类型(如果不是,您可以随时将其转换为map),您可以使用getItem
df = spark.createDataFrame([
    [1, {"name": "abc", "depts": ["dep01", "dep02"]}],
    [2, {"name": "xyz", "depts": ["dep03"], "sal": 100}]
],
    ['id', 'json_data']
)

df.select(
    df.id, 
    df.json_data.getItem('name').alias('name'), 
    df.json_data.getItem('depts').alias('depts'), 
    df.json_data.getItem('sal').alias('sal')
).show()

+---+----+--------------+----+
| id|name|         depts| sal|
+---+----+--------------+----+
|  1| abc|[dep01, dep02]|null|
|  2| xyz|       [dep03]| 100|
+---+----+--------------+----+

一种更动态的提取列的方法:
cols = ['name', 'depts', 'sal']
df.select(df.id, *(df.json_data.getItem(col).alias(col) for col in cols)).show()

我无法在变量“cols”中指定列名,因为我不知道JSON中所有可用字段。 - Munesh

1
基于@Gaurang Shah的答案,我实现了一个处理嵌套JSON结构并解决使用monotonically_increasing_id(非顺序)的问题的解决方案。
在这个方法中,“populateColumnName”函数递归检查StructType列并填充列名。
“renameColumns”函数通过将“.”替换为“_”来重命名列以识别嵌套的json字段。
“addIndex”函数向数据框添加索引,在解析JSON列后加入数据框。
def flattenJSON(df : DataFrame, columnName: String) : DataFrame = {

    val indexCol = "internal_temp_id"

    def populateColumnName(col : StructField) : Array[String] = {
        col.dataType match {
          case struct: StructType => struct.fields.flatMap(populateColumnName).map(col.name + "." + _)
          case rest         => Array(col.name)
        }
    }

    def renameColumns(name : String) : String = {
        if(name contains ".") {
            name + " as " + name.replaceAll("\\.", "_")
        }
        else name
    }

    def addIndex(df : DataFrame) : DataFrame = {

        // Append "rowid" column of type Long
        val newSchema = StructType(df.schema.fields ++ Array(StructField(indexCol, LongType, false)))

        // Zip on RDD level
        val rddWithId = df.rdd.zipWithIndex
        // Convert back to DataFrame
        spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
    }

    val dfWithID = addIndex(df)

    val jsonDF = df.select(columnName)

    val ds = jsonDF.rdd.map(_.getString(0)).toDS
    val parseDF = spark.read.option("inferSchema",true).json(ds)

    val columnNames = parseDF.schema.fields.flatMap(populateColumnName).map(renameColumns)

    var resultDF = parseDF.selectExpr(columnNames:_*)

    val jsonDFWithID = addIndex(resultDF)

    val joinDF = dfWithID.join(jsonDFWithID, indexCol).drop(indexCol)

    joinDF
}

val res = flattenJSON(jsonDF, "address")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接