Spark Parquet模式演化

3
我有一个分区的hdfs parquet位置,其中不同的分区具有不同的模式。 比如第一个分区有5个列,第二个分区有4个列。现在我尝试读取基本的Parquet路径,然后过滤第二个分区。 这会给我一个DF中的5列,即使我在第二个分区的Parquet文件中只有4列。 当我直接读取第二个分区时,它会正确地给出4列。如何解决这个问题。
2个回答

4

在读取parquet文件时,您可以指定所需的模式(4个列)

  • 然后,Spark仅读取包含在模式中的字段,如果数据中不存在该字段,则将返回null

示例:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val sch=new StructType().add("i",IntegerType).add("z",StringType)
spark.read.schema(sch).parquet("<parquet_file_path>").show()

//here i have i in my data and not have z field
//+---+----+
//|  i|   z|
//+---+----+
//|  1|null|
//+---+----+

我正在应用自己的模式,但它只有第二个分区中可用的4列。我正在读取具有多个分区的基本目录。我的DF模式具有5个列作为分区1文件具有5个模式。即使仅过滤分区2,我的DF仍然具有5列。 - akm
在读取时,您是否添加了 .schema(<new_schema>) 然后读取 Parquet 目录?如果我们不指定 .schema 选项,Parquet 将仅为指定的模式拉取数据,否则它将读取所有字段! - notNull
不,我在读取时不会添加模式。它会推断出模式,然后我将其转换为RDD,进行一些处理,最后创建一个带有模式的数据框。在这里,我只提供了4列,但是RDD已经有5列。因此,如果我有100列,则数据将从第5列移动。 - akm

1
我很愿意帮助您,但我不确定您实际想要达成什么目标。您的意图是什么?
如果您想读取包含所有分区的Parquet文件,并只获取两个分区都具有的列,则可能需要使用“mergeSchema”读取选项。
像Protocol Buffer、Avro和Thrift一样,Parquet也支持模式演化。用户可以从简单的模式开始,并根据需要逐渐添加更多列到模式中。通过这种方式,用户可能会得到多个具有不同但相互兼容的模式的Parquet文件。现在,Parquet数据源能够自动检测到这种情况并合并所有这些文件的模式。
由于模式合并是一个相对昂贵的操作,在大多数情况下并不必要,因此我们从1.5.0开始默认关闭了它。您可以在读取Parquet文件时(如下面的示例所示)通过设置数据源选项mergeSchema为true或设置全局SQL选项spark.sql.parquet.mergeSchema为true来启用它。
参见spark documentation

所以很有趣的是你使用的Spark版本以及如何设置属性spark.sql.parquet.mergeSchema(Spark设置)和mergeSchema(客户端)


是的,我只想要所有分区中都共有的模式。第1个分区有5列,第2个分区有4列,因此当我将该文件读取为DF时,我应该只获取4列,因为第1个分区的第5列在第2个分区中不可用,但是即使在将DF过滤为仅包含第2个分区后,我仍然得到了5列。但是,第2个分区内实际的Parquet文件并没有那第5列,而Spark会为该列插入null,这不是我想要的。当我过滤第2个分区时,我只想要4列。 - akm
好的,既然我得到了你,我认为解决这个问题最简单、最可靠的方法是@Shu刚才说的。但它不是通用的,我喜欢通用的、可重复使用的解决方案。你已经看过在读取分区Parquet文件时的mergeSchema选项了吗? - maxgruber19
我不能直接给出.schema并读取,因为我将转换为rdd并在读取后操作数据,最后,我将再次使用此处的schema转换为DF。目前,我已更改以直接读取分区目录,在这种情况下,我不会在DF中获得分区列,因此我手动添加它,但是当我读取一个分区时,它会提供正确的模式,该模式仅在该分区内可用,而不像先前的方法那样提供额外的模式。 - akm

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接