使用Spark DataFrames如何查询JSON数据列?

71

我有一个Cassandra表格,为了简单起见,它看起来像这样:

key: text
jsonData: text
blobData: blob

我可以使用Spark和spark-cassandra-connector创建一个基本的数据框,方法如下:

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()

我正在努力将JSON数据扩展到其基础结构。最终,我希望能够根据JSON字符串中的属性进行过滤,并返回blob数据。类似于jsonData.foo =“bar”,然后返回blobData。这目前是否可行?


key 是唯一标识符吗? - zero323
是的,key是该表的主键。 - JDesuv
请看这个优秀的示例,了解如何展开嵌套的JSON:https://bigdataprogrammers.com/read-nested-json-in-spark-dataframe/ - Apurva Singh
5个回答

111

Spark >= 2.4

如果需要的话,可以使用schema_of_json函数确定模式(请注意,这假设任意行是模式的有效代表)。

import org.apache.spark.sql.functions.{lit, schema_of_json, from_json}
import collection.JavaConverters._

val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]().asJava))

Spark >= 2.1

您可以使用 from_json 函数:

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("k", StringType, true), StructField("v", DoubleType, true)
))

df.withColumn("jsonData", from_json($"jsonData", schema))

Spark >= 1.6

您可以使用get_json_object函数,该函数接受一个列和一个路径参数:

import org.apache.spark.sql.functions.get_json_object

val exprs = Seq("k", "v").map(
  c => get_json_object($"jsonData", s"$$.$c").alias(c))

df.select($"*" +: exprs: _*)

并将字段提取为单独的字符串,可以进一步转换为期望的类型。

使用点语法表示path参数,前面带有$.表示文档根(因为上面的代码使用了字符串插值,所以需要转义$,因此为$$.)。

Spark <= 1.5:

目前是否可能实现?

据我所知,目前无法直接实现。你可以尝试类似于这样的方法:

val df = sc.parallelize(Seq(
  ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
  ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")

我假设blob字段无法在JSON中表示。否则您可以省略分割和连接:

我认为blob字段不可能用JSON来表示,否则您可以省略拆分和合并:

import org.apache.spark.sql.Row

val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map{
  case Row(key: String, json: String) =>
    s"""{"key": "$key", "jsonData": $json}"""
}) 

val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema

// root
//  |-- jsonData: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: double (nullable = true)
//  |-- key: long (nullable = true)
//  |-- blobData: string (nullable = true)

另一种替代方案(更便宜,但更加复杂)是使用UDF来解析JSON并输出一个structmap列。例如像这样的内容:

import net.liftweb.json.parse

case class KV(k: String, v: Int)

val parseJson = udf((s: String) => {
  implicit val formats = net.liftweb.json.DefaultFormats
  parse(s).extract[KV]
})

val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show

// +---+--------------------+------------------+----------+
// |key|            jsonData|          blobData|parsedJSON|
// +---+--------------------+------------------+----------+
// |  1|{"k": "foo", "v":...|some_other_field_1|   [foo,1]|
// |  2|{"k": "bar", "v":...|some_other_field_2|   [bar,3]|
// +---+--------------------+------------------+----------+

parsed.printSchema

// root
//  |-- key: string (nullable = true)
//  |-- jsonData: string (nullable = true)
//  |-- blobData: string (nullable = true)
//  |-- parsedJSON: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: integer (nullable = false)

在Spark 2.2.0中,使用StructTypeto_json中不起作用。List fields = new ArrayList<>(2); fields.add(DataTypes.createStructField("a", DataTypes.StringType, false)); StructType schema = DataTypes.createStructType(fields); g4.withColumn("dd", functions.to_json(functions.col("a"),schema) ).show(); - Yashwanth Kambala
如果您正在寻找一种更全面的方法来处理JSON数据中的模式变化,那么您应该使用spark.read.json()。我在我的答案中提供了详细信息和完整示例。 - Nick Chammas

41

zero323的回答非常详细,但遗漏了一种在Spark 2.1+中可用、比使用schema_of_json()更简单、更健壮的方法:

import org.apache.spark.sql.functions.from_json

val json_schema = spark.read.json(df.select("jsonData").as[String]).schema
df.withColumn("jsonData", from_json($"jsonData", json_schema))

以下是Python的等效代码:

from pyspark.sql.functions import from_json

json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
df.withColumn("jsonData", from_json("jsonData", json_schema))

schema_of_json()存在的问题,正如zero323所指出的那样,它检查单个字符串并从中推导出模式。如果您有具有不同模式的JSON数据,则从schema_of_json()返回的模式将不反映合并所有JSON数据模式时获得的模式。使用from_json()解析该数据将产生许多null或空值,其中schema_of_json()返回的模式与数据不匹配。

通过使用Spark从JSON字符串RDD推导出全面的JSON模式的能力,我们可以保证可以解析所有JSON数据。

示例:schema_of_json()spark.read.json()之间的区别

以下是一个示例(在Python中,Scala的代码非常相似),以说明使用schema_of_json()从单个元素推导模式和使用spark.read.json()从所有数据推导模式之间的差异。

>>> df = spark.createDataFrame(
...     [
...         (1, '{"a": true}'),
...         (2, '{"a": "hello"}'),
...         (3, '{"b": 22}'),
...     ],
...     schema=['id', 'jsonData'],
... )

{{a}}在一行中具有布尔值,在另一行中具有字符串值。合并后的{{a}}模式将其类型设置为字符串。 {{b}}将是一个整数。
让我们看看不同的方法如何比较。首先,是使用{{schema_of_json()}}方法:
>>> json_schema = schema_of_json(df.select("jsonData").take(1)[0][0])
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
 |-- id: long (nullable = true)
 |-- jsonData: struct (nullable = true)
 |    |-- a: boolean (nullable = true)

>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
|  1|  [true]|
|  2|    null|
|  3|      []|
+---+--------+

正如您所看到的,我们得出的JSON模式非常有限。""a": "hello""不能解析为布尔值并返回null,而"b": 22则被丢弃,因为它不在我们的模式中。
现在使用spark.read.json()
>>> json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
 |-- id: long (nullable = true)
 |-- jsonData: struct (nullable = true)
 |    |-- a: string (nullable = true)
 |    |-- b: long (nullable = true)

>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
|  1| [true,]|
|  2|[hello,]|
|  3|  [, 22]|
+---+--------+

在这里,我们保留了所有数据,并使用了一个全面的模式来解释所有数据。为了匹配 "a": "hello" 的模式,"a": true 被转换为字符串。

使用 spark.read.json() 的主要缺点是,Spark 将扫描所有数据以推导模式。根据您拥有的数据量,这个开销可能是相当大的。如果您知道所有的 JSON 数据具有一致的模式,则可以直接针对单个元素使用 schema_of_json()。如果您的模式存在变化但不想扫描所有数据,则可以在调用 spark.read.json() 时将 samplingRatio 设置为小于 1.0 的值,以查看数据子集。

这里是 spark.read.json() 的文档: Scala API / Python API


2
非常感谢!这是一个非常优雅的解决方案,应该被接受为这个问题的解决方案。 - Magicbeanbuyer
这是一个非常全面的解决方案!非常有帮助! - Fisher Coder

4
from_json函数正是您需要的。您的代码将类似于以下内容:
val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()

//You can define whatever struct type that your json states
val schema = StructType(Seq(
  StructField("key", StringType, true), 
  StructField("value", DoubleType, true)
))

df.withColumn("jsonData", from_json(col("jsonData"), schema))

1
底层的JSON字符串为:
"{ \"column_name1\":\"value1\",\"column_name2\":\"value2\",\"column_name3\":\"value3\",\"column_name5\":\"value5\"}";

以下是过滤JSON并将所需数据加载到Cassandra的脚本。
  sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2")
            .write.format("org.apache.spark.sql.cassandra")
            .options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name"))
            .mode(SaveMode.Append)
            .save()

1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接