在Scala中从嵌套的JSON文件创建Spark DataFrame

3
我有一个长这样的JSON文件:
{
"group" : {},
"lang" : [ 
    [ 1, "scala", "functional" ], 
    [ 2, "java","object" ], 
    [ 3, "py","interpreted" ]
]
}

我尝试使用以下代码创建数据框:
val path = "some/path/to/jsonFile.json"
val df = sqlContext.read.json(path)
df.show()

当我运行这个程序时,我得到了以下结果。
df: org.apache.spark.sql.DataFrame = [_corrupt_record: string]

我们如何根据“lang”关键字的内容创建一个df?我不关心group{},我只需要从“lang”中取出数据并应用以下情况类:
case class ProgLang (id: Int, lang: String, type: String )

我已经阅读了这篇文章在Apache Spark中读取JSON - “corrupt_record”,并且理解了每个记录都需要换行,但是在我的情况下,我无法更改文件结构。

2个回答

7

json格式不正确。 sqlContextjson API 读取时会将其视为损坏的记录。正确的形式为:

{"group":{},"lang":[[1,"scala","functional"],[2,"java","object"],[3,"py","interpreted"]]}

假设您已经有一个文件(“/home/test.json”)中包含需要的数据,那么您可以使用以下方法来获取所需的 dataframe

import org.apache.spark.sql.functions._
import sqlContext.implicits._

val df = sqlContext.read.json("/home/test.json")

val df2 = df.withColumn("lang", explode($"lang"))
    .withColumn("id", $"lang"(0))
    .withColumn("langs", $"lang"(1))
    .withColumn("type", $"lang"(2))
    .drop("lang")
    .withColumnRenamed("langs", "lang")
    .show(false)

你应该拥有

+---+-----+-----------+
|id |lang |type       |
+---+-----+-----------+
|1  |scala|functional |
|2  |java |object     |
|3  |py   |interpreted|
+---+-----+-----------+

更新

如果您不想像下面评论中提到的那样更改输入json格式,您可以使用wholeTextFiles来读取json文件,并按照下面的方式进行解析

import sqlContext.implicits._
import org.apache.spark.sql.functions._

val readJSON = sc.wholeTextFiles("/home/test.json")
  .map(x => x._2)
  .map(data => data.replaceAll("\n", ""))

val df = sqlContext.read.json(readJSON)

val df2 = df.withColumn("lang", explode($"lang"))
  .withColumn("id", $"lang"(0).cast(IntegerType))
  .withColumn("langs", $"lang"(1))
  .withColumn("type", $"lang"(2))
  .drop("lang")
  .withColumnRenamed("langs", "lang")

df2.show(false)
df2.printSchema

它应该会给你上面的dataframe和下面的schema

root
 |-- id: integer (nullable = true)
 |-- lang: string (nullable = true)
 |-- type: string (nullable = true)

Ramesh,我应该更清楚地表达,我无法更改我的JSON文件格式,它只包含一个JSON文档,即之前共享的数据,并且其中有换行符,而在你的情况下,所有内容都在一行中。 - devtest13
你的问题中贴出的格式是否相同?那么如何以相同的格式添加另一条记录呢? - Ramesh Maharjan
是的,在 .json 文件中,格式保持不变,但是 "lang" : [4, "ruby","interpreted"] 这部分会得到新数据。 - devtest13
我觉得我离成功又近了一步,现在我有一个带有组和语言模式的数据框。root |-- lang: array (nullable = true) | |-- element: array (containsNull = true) | | |-- element: string (containsNull = true)以下是代码: val readJSON = sc.wholeTextFiles("'home/data.json").map(x => x._2).map(data => data.replaceAll("""[\n]+""", " ")) val df = sqlContext.read.json(readJSON) df.printSchema - devtest13
是的,我也在考虑使用wholeTextFiles。你说得对。之后你可以直接使用我上面的解决方案 :) 创建df2的部分。 - Ramesh Maharjan
我已经更新了我的回答 :) 请检查。 - Ramesh Maharjan

2

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接