这需要进行细分。
FileSystem
seek()
+read()
或readFully(position, buffer, length)
调用?是的。spark.hadoop.fs.s3a.experimental.fadvise=random
以触发随机访问。Hadoop 2.7及以下版本对文件的大量seek()操作处理不佳,因为它们总是启动GET offset-end-of-file,由于下一个seek的出现而感到惊讶,必须中止该连接,重新打开新的TCP/HTTPS 1.1连接(速度慢,CPU消耗大),再次执行同样的操作。随机IO操作会影响加载. csv.gz等大量数据时的速度,但对于获得ORC / Parquet性能至关重要。
您不会在Hadoop 2.7的hadoop-aws JAR上获得加速。如果需要,请更新hadoop*.jar和依赖项,或者从头开始构建Spark针对Hadoop 2.8。
请注意,Hadoop 2.8+还具有一个很好的功能:如果在日志语句中调用S3A文件系统客户端的toString()
,它会打印出所有文件系统IO统计信息,包括在寻找、中止TCP连接等过程中丢弃的数据量。帮助您了解正在发生的情况。
2018-04-13 警告:不要尝试将Hadoop 2.8+的hadoop-aws
JAR与其余的hadoop-2.7 JAR集一起放在类路径中,并期望看到任何加速。您将只会看到堆栈跟踪。您需要更新所有hadoop JAR及其传递依赖项。
免责声明:我没有明确的答案,也不想充当权威来源,但我在Spark 2.2+中花费了一些时间来支持parquet,并希望我的回答能够帮助我们大家更接近正确的答案。
Parquet在S3上是否避免从S3拉取未使用的列的数据,仅检索所需的文件块,还是会拉取整个文件?
我使用今天直接从master构建的Spark2.3.0-SNAPSHOT。
parquet
数据源格式由ParquetFileFormat处理,它是一个FileFormat。
如果我理解正确,读取部分由buildReaderWithPartitionValues方法处理(覆盖了FileFormat
的方法)。
buildReaderWithPartitionValues
仅用于当请求 FileSourceScanExec
物理运算符的所谓输入 RDD(实际上是单个 RDD)以在执行 WholeStageCodegenExec
时生成内部行时。
话虽如此,我认为审查 buildReaderWithPartitionValues
所做的事情可能会使我们更接近最终答案。
当您查看 该行 时,可以确信我们正在正确的轨道上。
// 尝试在启用过滤器下推时下推过滤器。
该代码路径取决于默认情况下 已打开 的 spark.sql.parquet.filterPushdown
Spark 属性。
spark.sql.parquet.filterPushdown 设置为 true 时启用 Parquet 过滤器下推优化。
这导致我们使用Parquet-hadoop的ParquetInputFormat.setFilterPredicate当且仅当过滤器被定义。
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
当代码回退到parquet-mr时,过滤器被使用时,代码变得更加有趣(而不是使用所谓的矢量化parquet解码读取器)。那是我不太理解的部分(除了我在代码中能看到的部分)。
请注意,矢量化parquet解码读取器由spark.sql.parquet.enableVectorizedReader
Spark属性控制,默认情况下已打开。
TIP: 要知道if
表达式的哪一部分被使用,请为org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
记录器启用DEBUG
日志级别。
为了查看所有推送的下推过滤器,您可以将org.apache.spark.sql.execution.FileSourceScanExec
记录器的日志级别设置为INFO
。您应该在日志中看到以下内容:
INFO Pushed Filters: [pushedDownFilters]
Spark的Parquet阅读器就像任何其他InputFormat一样,
没有任何一个inputFormat对S3有特殊要求。输入格式可以从LocalFileSystem、Hdfs和S3中读取,没有为此做特殊优化。
Parquet InpuTFormat根据您请求的列进行选择性读取。
如果您想确保(尽管下推谓词在最新的Spark版本中有效),请手动选择列并编写转换和操作,而不是依赖SQL。
以下是最近因Spark 2.1无法根据存储在Parquet文件中的元数据计算数据集中所有行的COUNT(*)
而开启的问题报告示例:https://issues.apache.org/jira/browse/SPARK-21074