Spark处理JSON的异常情况

5

我正在尝试在读取json文件时捕获/忽略解析错误

val DF = sqlContext.jsonFile("file")

有几行不是有效的json对象,但数据太大了,无法逐个查看(约1TB)

我遇到了使用import scala.util.Tryin.map(a => Try(a.toInt))进行映射的异常处理,参考: 如何在spark map()函数中处理异常?

当使用函数sqlContext.jsonFile读取json文件时,如何捕获异常?

谢谢!


在读取json时,您可以使用.option("mode","DROPMALFORMED")来忽略错误的记录,或者使用.option("mode","FAILFAST")来通过有用的异常中断作业。请参阅文档以获取更多详细信息 - ecoe
1个回答

2

很遗憾,在这里你没有什么好运气。DataFrameReader.json在内部使用,几乎是全有或全无的。如果输入包含格式不正确的行,你必须手动过滤它们。基本解决方案可能如下:

import scala.util.parsing.json._

val df = sqlContext.read.json(
    sc.textFile("file").filter(JSON.parseFull(_).isDefined)
)

由于上述验证较为耗费资源,您可能更倾向于完全放弃jsonFile / read.json,直接使用解析后的JSON行。


1
我可以建议使用 isDefined 而不是 match { ... } 吗? - Daniel Darabos
尽管放心。谢谢。 - zero323
谢谢@zero323!虽然速度慢了很多,但它能正常工作,让任务在晚上运行也没有问题 :) - Kim Ngo
你可以尝试不同的解析库。我认为Spark内部使用的是Jackson。使用mapPartitions而不是map也可以提高性能。 - zero323

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接