将大量的JSON文件读入Spark Dataframe

5

我有一个大型的嵌套的NDJ(按新行分隔的JSON)文件,需要将其读入单个spark dataframe并保存为Parquet格式。为了呈现模式,我使用以下函数:

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
        schema.fields.flatMap(f => {
          val colName = if (prefix == null) f.name else (prefix + "." + f.name)
          f.dataType match {
            case st: StructType => flattenSchema(st, colName)
            case _ => Array(col(colName))
          }
        })
  }

在通过以下方式读取返回的数据帧上:

val df = sqlCtx.read.json(sparkContext.wholeTextFiles(path).values)

我已经将其切换为 val df = spark.read.json(path),以便仅适用于NDJs而非多行JSON - 但出现了相同的错误。

这导致工作进程中出现了内存不足的错误:java.lang.OutOfMemoryError: Java heap space

我已更改JVM内存选项和Spark执行程序/驱动程序选项,但无济于事。

有没有一种方法可以流式传输文件,展开模式并逐步添加到数据帧中? JSON的某些行包含前面条目的新字段...因此需要稍后填充这些字段。

2个回答

2

没有解决办法。问题出在JVM对象限制上。最终我使用了Scala的JSON解析器,并手动构建了数据框架。


0

你可以用多种方式来实现这个。

首先,在读取时,你可以为数据帧提供一个模式来读取 JSON,或者允许 Spark 自己推断出模式。

一旦 JSON 转换为数据帧,你可以按照以下方式对其进行展开。

a. 使用 dataframe 的 explode() 方法 - 对其进行展开。 b. 使用 Spark SQL 并使用 . 操作符访问嵌套字段。你可以在这里找到示例 here

最后,如果你想要向数据帧添加新列 a. 第一种选择,使用 withColumn() 方法是一种方法。但是这将针对每个添加的新列和整个数据集进行操作。 b. 使用 SQL 从现有数据生成新的数据帧 - 这可能是最简单的方法。 c. 最后,使用 map,然后访问元素,获取旧模式,添加新值,创建新模式,最后获取新数据帧 - 如下所示

使用withColumn会对整个RDD进行操作。因此,通常不建议为要添加的每一列使用该方法。有一种方法可以在map函数内部处理列及其数据。由于一个map函数在这里完成了工作,因此添加新列及其数据的代码将并行执行。

a. 可以根据计算收集新值

b. 将这些新列值添加到主RDD中,如下所示

val newColumns: Seq[Any] = Seq(newcol1,newcol2)
Row.fromSeq(row.toSeq.init ++ newColumns)

这里的row是map方法中行的引用。

c. 创建以下新模式

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType))

d. 将其添加到旧模式中

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType)

e. 创建新的数据框并添加新列

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema)

它如何解决由wholeTextFiles引起的java.lang.OutOfMemoryError问题? - user6022341
我正在回答“是否有一种方法可以流式传输文件,扁平化模式,并逐步将其添加到数据帧中? JSON的某些行包含前面实体的新字段...因此那些字段稍后需要填充。” 我没有看到关于内存问题解决的问题,所以给了他多种方法。 - Ramzy
如果NDJ是JSONL,则OP不应使用wholeTextFiles。如果不是这样,这并没有帮助。 - user6022341
所有可以使用textFile()实现的功能都可以使用wholeTextFile()来实现,但性能较差。反之则不成立-由于多行场景的存在。由于这里是NDJ,因此最好的替代方案是摆脱wholetext文件以及上述提到的其他替代方案。如果我漏掉了什么,请告诉我。 - Ramzy
我曾使用wholeTextFile来处理多行JSON,现在改用正常的val df = spark.read.json(path)。然而,这仍然导致了Java OOM错误。我尝试使用-xmx选项增加内存,并且还增加了驱动程序/执行器节点的内存。JSON文件大小约为1G,我的笔记本电脑上有16GB可用于测试。 - Anisotropic
你提到了执行者的OOM错误。而且你也说文件很大。我的建议是先尝试使用小数据集使逻辑正常工作。然后再尝试在实际文件上操作。回到你上面的两个矛盾的陈述 - 如果你在读取文件时遇到麻烦,那么你在驱动程序本身就会收到OOM错误。另一方面,如果是在worker节点上,那么读取是好的,错误是由于执行者执行的某些逻辑导致的。 - Ramzy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接