Spark是否支持对存储在S3的parquet文件进行真正的列扫描?

36

Parquet数据存储格式的一个巨大优势是它是按列存储的。如果我的数据集有几百列,但我的查询只涉及其中几列,那么就可以只读取存储这几列数据的数据,跳过其余部分。

想必这个功能是通过在 Parquet 文件头部读取一些元数据,指示每个列在文件系统上的位置来实现的。然后读取器可以在磁盘上寻址,仅读取必要的列。

是否有人知道 Spark 的默认 Parquet 读取器是否正确地实现了这种对 S3 的选择性寻址?我认为S3支持这种方式,但理论支持和正确利用该支持的具体实现之间存在很大差异。


7
我之所以问这个问题,是因为我注意到一些Spark/Parquet宣传的功能尚未得到正确实现,例如谓词下推,它使得只能读取某些分区。我对此感到惊讶,并开始想知道Parquet/Spark有多少功能确实如宣传所述。 - conradlee
4个回答

21

这需要进行细分。

  1. Parquet代码是否从Spark获取谓词(是)?
  2. 然后,Parquet尝试仅选择性地读取那些列,使用Hadoop FileSystem seek()+read()readFully(position, buffer, length)调用?是的。
  3. S3连接器是否将这些文件操作转换为高效的HTTP GET请求?在Amazon EMR中:是的。在Apache Hadoop中,您需要将hadoop 2.8放入类路径中,并设置正确的spark.hadoop.fs.s3a.experimental.fadvise=random以触发随机访问。

Hadoop 2.7及以下版本对文件的大量seek()操作处理不佳,因为它们总是启动GET offset-end-of-file,由于下一个seek的出现而感到惊讶,必须中止该连接,重新打开新的TCP/HTTPS 1.1连接(速度慢,CPU消耗大),再次执行同样的操作。随机IO操作会影响加载. csv.gz等大量数据时的速度,但对于获得ORC / Parquet性能至关重要。

您不会在Hadoop 2.7的hadoop-aws JAR上获得加速。如果需要,请更新hadoop*.jar和依赖项,或者从头开始构建Spark针对Hadoop 2.8。

请注意,Hadoop 2.8+还具有一个很好的功能:如果在日志语句中调用S3A文件系统客户端的toString(),它会打印出所有文件系统IO统计信息,包括在寻找、中止TCP连接等过程中丢弃的数据量。帮助您了解正在发生的情况。

2018-04-13 警告:不要尝试将Hadoop 2.8+的hadoop-awsJAR与其余的hadoop-2.7 JAR集一起放在类路径中,并期望看到任何加速。您将只会看到堆栈跟踪。您需要更新所有hadoop JAR及其传递依赖项。


谢谢您对此进行详细解释!我认为这个分解是其他答案所缺乏的。 - conradlee

11

免责声明:我没有明确的答案,也不想充当权威来源,但我在Spark 2.2+中花费了一些时间来支持parquet,并希望我的回答能够帮助我们大家更接近正确的答案。


Parquet在S3上是否避免从S3拉取未使用的列的数据,仅检索所需的文件块,还是会拉取整个文件?

我使用今天直接从master构建的Spark2.3.0-SNAPSHOT

parquet数据源格式由ParquetFileFormat处理,它是一个FileFormat

如果我理解正确,读取部分由buildReaderWithPartitionValues方法处理(覆盖了FileFormat的方法)。

buildReaderWithPartitionValues 仅用于当请求 FileSourceScanExec 物理运算符的所谓输入 RDD(实际上是单个 RDD)以在执行 WholeStageCodegenExec 时生成内部行时。

话虽如此,我认为审查 buildReaderWithPartitionValues 所做的事情可能会使我们更接近最终答案。

当您查看 该行 时,可以确信我们正在正确的轨道上。

// 尝试在启用过滤器下推时下推过滤器。

该代码路径取决于默认情况下 已打开spark.sql.parquet.filterPushdown Spark 属性。

spark.sql.parquet.filterPushdown 设置为 true 时启用 Parquet 过滤器下推优化。

这导致我们使用Parquet-hadoop的ParquetInputFormat.setFilterPredicate当且仅当过滤器被定义。

if (pushed.isDefined) {
  ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}

当代码回退到parquet-mr时,过滤器被使用时,代码变得更加有趣(而不是使用所谓的矢量化parquet解码读取器)。那是我不太理解的部分(除了我在代码中能看到的部分)。

请注意,矢量化parquet解码读取器由spark.sql.parquet.enableVectorizedReader Spark属性控制,默认情况下已打开。

TIP: 要知道if表达式的哪一部分被使用,请为org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat记录器启用DEBUG日志级别。

为了查看所有推送的下推过滤器,您可以将org.apache.spark.sql.execution.FileSourceScanExec记录器的日志级别设置为INFO。您应该在日志中看到以下内容

INFO Pushed Filters: [pushedDownFilters]

我希望即使这不是最终答案,它也能有所帮助,让其他人接手并尽快形成最终答案。希望不会熄灭 :)

1

Spark的Parquet阅读器就像任何其他InputFormat一样,

  1. 没有任何一个inputFormat对S3有特殊要求。输入格式可以从LocalFileSystem、Hdfs和S3中读取,没有为此做特殊优化。

  2. Parquet InpuTFormat根据您请求的列进行选择性读取。

  3. 如果您想确保(尽管下推谓词在最新的Spark版本中有效),请手动选择列并编写转换和操作,而不是依赖SQL。


3
谢谢您的回答,但即使阅读了它,仍然不清楚最近的spark分发是否真正支持谓词下推。我正在寻找一个可以深入到从s3读取parquet时调用的输入读取器的特定实现,或者进行经验测试的答案。请参见https://dev59.com/kFsW5IYBdhLWcg3wfXRa#41609999--有一个令人惊讶的结果表明s3上的过滤器下推已经失效。 - conradlee
1
请注意Spark版本。早期版本中存在谓词下推的问题,但从2.x版本开始(特别是2.2版本),该问题已得到解决。 - Igor Berman

1
不,谓词下推并没有完全支持。当然,这取决于:
- 具体的使用情况 - Spark 版本 - S3 连接器类型和版本
为了检查您的具体用例,您可以在 Spark 中启用 DEBUG 日志级别,并运行查询。然后,您可以看到是否在 S3(HTTP)请求期间进行了“seeks”,以及实际发送了多少个请求。类似这样的内容:

以下是最近因Spark 2.1无法根据存储在Parquet文件中的元数据计算数据集中所有行的COUNT(*)而开启的问题报告示例:https://issues.apache.org/jira/browse/SPARK-21074


Michael,问题不在于Spark本身,而是它捆绑的Hadoop JARs版本;HDP和CDH中的JARs采用“惰性”查找,如果启用随机IO,则可以高效地进行列式数据读取。关于SPARK-21074,该JIRA等待您升级后的经验;如果您没有得到答案,它可能会被关闭为“已修复/无法重现”。 - stevel

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接