我有一个大型的嵌套的NDJ(按新行分隔的JSON)文件,需要将其读入单个spark dataframe并保存为Parquet格式。为了呈现模式,我使用以下函数:
def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix + "." + f.name)
f.dataType match {
case st: StructType => flattenSchema(st, colName)
case _ => Array(col(colName))
}
})
}
在通过以下方式读取返回的数据帧上:
val df = sqlCtx.read.json(sparkContext.wholeTextFiles(path).values)
我已经将其切换为 val df = spark.read.json(path)
,以便仅适用于NDJs而非多行JSON - 但出现了相同的错误。
这导致工作进程中出现了内存不足的错误:java.lang.OutOfMemoryError: Java heap space
。
我已更改JVM内存选项和Spark执行程序/驱动程序选项,但无济于事。
有没有一种方法可以流式传输文件,展开模式并逐步添加到数据帧中? JSON的某些行包含前面条目的新字段...因此需要稍后填充这些字段。
wholeTextFiles
引起的java.lang.OutOfMemoryError
问题? - user6022341val df = spark.read.json(path)
。然而,这仍然导致了Java OOM错误。我尝试使用-xmx选项增加内存,并且还增加了驱动程序/执行器节点的内存。JSON文件大小约为1G,我的笔记本电脑上有16GB可用于测试。 - Anisotropic