为什么Apache Spark会读取嵌套结构中不必要的Parquet列?

23

我的团队正在使用Spark构建ETL过程,将原始分隔符文本文件加载到基于Parquet的“数据湖”中。 Parquet列存储的一个承诺是查询只会读取必要的“列条纹”。

但是,对于嵌套模式结构,我们发现正在阅读意外的列。

为了说明这一点,这里有一个使用Scala和Spark 2.0.1 shell的POC:

// Preliminary setup
sc.setLogLevel("INFO")
import org.apache.spark.sql.types._
import org.apache.spark.sql._

// Create a schema with nested complex structures
val schema = StructType(Seq(
    StructField("F1", IntegerType),
    StructField("F2", IntegerType),
    StructField("Orig", StructType(Seq(
        StructField("F1", StringType),
        StructField("F2", StringType))))))

// Create some sample data
val data = spark.createDataFrame(
    sc.parallelize(Seq(
        Row(1, 2, Row("1", "2")),
        Row(3, null, Row("3", "ABC")))),
    schema)

// Save it
data.write.mode(SaveMode.Overwrite).parquet("data.parquet")

然后,我们将文件读取回DataFrame,并投影到列的子集:

// Read it back into another DataFrame
val df = spark.read.parquet("data.parquet")

// Select & show a subset of the columns
df.select($"F1", $"Orig.F1").show

当运行此命令时,我们会看到预期的输出:
+---+-------+
| F1|Orig_F1|
+---+-------+
|  1|      1|
|  3|      3|
+---+-------+

但是...查询计划显示了略微不同的情况:

“优化后的计划”显示:

val projected = df.select($"F1", $"Orig.F1".as("Orig_F1"))
projected.queryExecution.optimizedPlan
// Project [F1#18, Orig#20.F1 AS Orig_F1#116]
// +- Relation[F1#18,F2#19,Orig#20] parquet

“解释”即表示:

projected.explain
// == Physical Plan ==
// *Project [F1#18, Orig#20.F1 AS Orig_F1#116]
// +- *Scan parquet [F1#18,Orig#20] Format: ParquetFormat, InputPaths: hdfs://sandbox.hortonworks.com:8020/user/stephenp/data.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<F1:int,Orig:struct<F1:string,F2:string>>

执行期间生成的 INFO 日志也确认了 Orig.F2 列被意外读取:

16/10/21 15:13:15 INFO parquet.ParquetReadSupport: Going to read the following fields from the Parquet file:

Parquet form:
message spark_schema {
  optional int32 F1;
  optional group Orig {
    optional binary F1 (UTF8);
    optional binary F2 (UTF8);
  }
}

Catalyst form:
StructType(StructField(F1,IntegerType,true), StructField(Orig,StructType(StructField(F1,StringType,true), StructField(F2,StringType,true)),true))

根据Dremel论文Parquet文档,复杂嵌套结构的列应该独立存储和独立检索。
问题:
  1. 这种行为是当前Spark查询引擎的限制吗?换句话说,Parquet是否支持最优执行此查询,但Spark的查询计划者很简单?
  2. 还是说这是当前Parquet实现的限制?
  3. 还是我没有正确使用Spark API?
  4. 或者,我误解了Dremel / Parquet列存储的工作原理?
可能相关:为什么在Spark SQL中嵌套列的查询性能不同?

这是一个Spark查询引擎问题。 - user6022341
根据 @julien-le-dem 的说法,Parquet 应该支持这种情况。https://twitter.com/J_/status/789584704169123841 - Peter Stephens
1
https://github.com/apache/spark/pull/16578 解决了这个问题。 - Gaurav Shah
2
更新。之前的PR已经关闭且没有解决方案,现在开了一个新的简化版PR。在这里跟踪新的PR:https://github.com/apache/spark/pull/21320 - Peter Stephens
你是否打开了 SQL 标志以启用此优化?2.4.0 版似乎已经为我解决了这个问题。 - Gaurav Shah
显示剩余5条评论
2个回答

5

谓词下推的限制不应影响投影操作。问题可能有关但并非相同。 - user6022341
抱歉,在我的回答中使用了“谓词”这个词,但链接的 JIRA 票据标题是“Parquet filter push down doesn't handle struct fields”。 - Ewan Leith
不确定这是否是答案,但会去看一下。原始问题没有过滤器,因此谓词下推不应该适用。 - Peter Stephens

0

自 Spark 2.4.0 起,该问题已得到解决。这适用于结构体以及结构体数组。

在 Spark 3.0.0 之前:

spark.sql.optimizer.nestedSchemaPruning.enabled 设置为 true

相关 Jira 请参见:https://issues.apache.org/jira/browse/SPARK-4502

在 Spark 3.0.0 之后:

spark.sql.optimizer.nestedSchemaPruning.enabled 的默认值现在为 true

相关 Jira 请参见:https://issues.apache.org/jira/browse/SPARK-29805

还有一个相关的 SO 问题:Efficient reading nested parquet column in Spark


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接