Spark >= 2.4
如果需要的话,可以使用schema_of_json
函数确定模式(请注意,这假设任意行是模式的有效代表)。
import org.apache.spark.sql.functions.{lit, schema_of_json, from_json}
import collection.JavaConverters._
val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]().asJava))
Spark >= 2.1
您可以使用 from_json
函数:
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("k", StringType, true), StructField("v", DoubleType, true)
))
df.withColumn("jsonData", from_json($"jsonData", schema))
Spark >= 1.6
您可以使用get_json_object
函数,该函数接受一个列和一个路径参数:
import org.apache.spark.sql.functions.get_json_object
val exprs = Seq("k", "v").map(
c => get_json_object($"jsonData", s"$$.$c").alias(c))
df.select($"*" +: exprs: _*)
并将字段提取为单独的字符串,可以进一步转换为期望的类型。
使用点语法表示path
参数,前面带有$.
表示文档根(因为上面的代码使用了字符串插值,所以需要转义$
,因此为$$.
)。
Spark <= 1.5:
目前是否可能实现?
据我所知,目前无法直接实现。你可以尝试类似于这样的方法:
val df = sc.parallelize(Seq(
("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")
我假设blob
字段无法在JSON中表示。否则您可以省略分割和连接:
我认为blob
字段不可能用JSON来表示,否则您可以省略拆分和合并:
import org.apache.spark.sql.Row
val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map{
case Row(key: String, json: String) =>
s"""{"key": "$key", "jsonData": $json}"""
})
val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema
另一种替代方案(更便宜,但更加复杂)是使用UDF来解析JSON并输出一个struct
或map
列。例如像这样的内容:
import net.liftweb.json.parse
case class KV(k: String, v: Int)
val parseJson = udf((s: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(s).extract[KV]
})
val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show
parsed.printSchema
key
是唯一标识符吗? - zero323