使用Spark和Scala读取文本文件中的JSON

3

我有一个文本文件,其中包含类似以下JSON数据的内容:

{
  "element" : value,
  "id" : value,
  "total" : []
}
{
  "element" : value,
  "id" : value,
  "total: []
}

所有JSON都是以换行符分隔的。

我正在尝试将所有文本文件的数据加载到临时视图中:

sqlContext.read.textFiles("/path").createOrReplaceTempView("result")

val data = sqlContext.sql("select * from result").collect()

结果:

[{"element" : value,"id" : value,"total" : [] }]
[{"element" : value,"id" : value, "total" : []}]

我需要提取与id相关的总数。

在Spark中有没有处理这个问题的方法?


你能否在“result”视图中显示数据时,同时添加你所得到的内容? - Akash Sethi
3个回答

5

使用Spark SQL时,每行都必须包含一个独立、自包含的有效JSON数据,否则计算将失败。

但是你可以尝试这个方法:

spark.read.json(spark.sparkContext.wholeTextFiles("path to json").values) 

或者

spark.read.option("wholeFile", true).option("mode", "PERMISSIVE").json("path to json")

这应该将json转换为数据框。

这个 .values 是什么意思?你是指 JSON 里面的值吗? - abhi5800
spark.sparkContext.wholeTextFiles("路径到json")将返回一个RDD。从那个中选择RDD的值.values。此外,我已经更新了适用于spark 2.2的选项。 - Avishek Bhattacharya
谢谢,老兄!它像魔法一样奏效了.. :) :) (2.2版本有所帮助..) - abhi5800

1
给定输入文件,其中包含json数据。
{
  "element" : value,
  "id" : value,
  "total" : []
}
{
  "element" : value,
  "id" : value,
  "total: []
}

这不是一个有效的json,不能转换为dataframe,因此您需要将数据转换为有效的spark可读的json格式。
val rdd = sc.wholeTextFiles("path to the json file")

val validJsonRdd = rdd.flatMap(_._2.replace(" ", "").replace("\n", "").replace(":value", ":\"value\"").replace("}{", "}\n{").split("\n"))

上述步骤仅适用于元素和ID字段中没有带有引号的值字符串。否则,您可以根据需要进行修改。
下一步是使用sqlcontext将其转换为数据框。
 val df = sqlContext.read.json(validJsonRdd)

应该得到的结果是

+-------+-----+-----+
|element|id   |total|
+-------+-----+-----+
|value  |value|[]   |
|value  |value|[]   |
+-------+-----+-----+

现在你应该能够选择相应的idtotals并进行操作。
希望这个答案对你有所帮助。

1
@这对于小数据集来说是可行的。但如果数据集很大,将所有数据转换为有效的JsonRDD将需要很长时间。 - abhi5800
由于我的文本文件已经定义了JSON对象,我不能直接读取它们吗? - abhi5800
你可以这样做,但是你需要编写一个JSON解析器,因为Spark不支持你所拥有的JSON类型。 - philantrovert
1
@abhi5800,你在你的大数据集中试过了吗?你有多少数据? - Ramesh Maharjan
1
@RameshMaharjan 是的,我在1.8GB的数据集上尝试了这种方法。它花费了很长时间,我不得不在中途停止它。 :( - abhi5800
显示剩余2条评论

0
此外,由于需要一些时间来理解它,当查询嵌套在totals中的内容时,您可能需要使用"explode"方法:
Dataset<Row> socials = sparkSession
            .read()
            .option("multiLine", true)
            .option("mode", "PERMISSIVE")
            .json(<path to file>).cache();

socials.select(org.apache.spark.sql.functions.explode(socials.col("total")).as("t")).where("t.<some nested column under total> = 'foo'").toJSON().collectAsList();

这是针对Java Spark的,但希望explode方法能有所帮助。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接