Spark列出所有分区数据的叶子节点。

26

我有按日期小时分区的Parquet数据,文件夹结构如下:

events_v3
  -- event_date=2015-01-01
    -- event_hour=2015-01-1
      -- part10000.parquet.gz
  -- event_date=2015-01-02
    -- event_hour=5
      -- part10000.parquet.gz
我使用Spark创建了一个名为raw_events的表,但是当我尝试查询数据时,它会扫描所有目录以寻找页脚,这会减慢初始查询速度,即使我只查询一天的数据。查询语句如下:select * from raw_events where event_date='2016-01-01'。类似的问题可以参考链接:http://mail-archives.apache.org/mod_mbox/spark-user/201508.mbox/%3CCAAswR-7Qbd2tdLSsO76zyw9tvs-Njw2YVd36bRfCG3DKZrH0tw@mail.gmail.com%3E(但是这个链接比较老)。
App > 16/09/15 03:14:03 main INFO HadoopFsRelation: Listing leaf files and directories in parallel under: s3a://bucket/events_v3/

然后它生成350个任务,因为有350天的数据。

我已经禁用了schemaMerge,并且也指定了要读取的模式,所以它可以直接转到我正在查看的分区,为什么还要打印所有叶子文件? 使用2个执行器列出叶子文件需要10分钟,而查询实际执行只需要20秒。

代码示例:

val sparkSession = org.apache.spark.sql.SparkSession.builder.getOrCreate()
val df = sparkSession.read.option("mergeSchema","false").format("parquet").load("s3a://bucket/events_v3")
    df.createOrReplaceTempView("temp_events")
    sparkSession.sql(
      """
        |select verb,count(*) from temp_events where event_date = "2016-01-01" group by verb
      """.stripMargin).show()

1
@lostinoverflow 我仍然没有找到为什么我们要进行递归读取,但是我已经成功将10分钟的初始扫描缩短为1分钟。有效地将查询时间减少到不到2分钟。 - Gaurav Shah
顺便问一下,你在用什么版本的Spark?Spark 2.0中有一些已修复的错误可以修复谓词下推,可能与此相关。 - Igor Berman
1
@LostInOverflow 当我们尝试查询Spark时,它会创建一个路径目录,其中内部递归列出所有文件夹。它首先调用以获取文件夹列表,然后对每个文件夹再次进行查询,并递归执行此过程。在S3中,这个过程非常缓慢。我将Spark的递归调用移动到了S3文件系统中。在那里,我可以要求S3有效地获取所有具有前缀“events_v3/”的文件,从而递归获取所有文件。在我的情况下,它将48,000个API调用减少到了300个API调用。 - Gaurav Shah
1
@GauravShah 你能把它发布为答案吗?如果没有更好的解决方案,我想给予奖励。 - user6022341
显示剩余4条评论
2个回答

17
一旦Spark读取一个目录,它会调用listLeafFiles(org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala)方法。这个方法又会调用fs.listStatus来获取文件和目录的列表。对于每个目录,该方法会再次被调用,这个过程会递归进行直到没有目录为止。这种设计在HDFS系统中效果很好,但在S3中表现不佳,因为列出文件需要通过RPC调用。而S3支持按前缀获取全部文件,这正是我们所需要的。
例如,如果我们有上面提到的目录结构,其中包含一年的数据,每个小时有10个子目录,那么我们需要进行87,000次API调用。但是由于只有137,000个文件,这可以减少到138个API调用。每个S3 API调用返回1000个文件。
代码:org/apache/hadoop/fs/s3a/S3AFileSystem.java
public FileStatus[] listStatusRecursively(Path f) throws FileNotFoundException,
            IOException {
        String key = pathToKey(f);
        if (LOG.isDebugEnabled()) {
            LOG.debug("List status for path: " + f);
        }

        final List<FileStatus> result = new ArrayList<FileStatus>();
        final FileStatus fileStatus =  getFileStatus(f);

        if (fileStatus.isDirectory()) {
            if (!key.isEmpty()) {
                key = key + "/";
            }

            ListObjectsRequest request = new ListObjectsRequest();
            request.setBucketName(bucket);
            request.setPrefix(key);
            request.setMaxKeys(maxKeys);

            if (LOG.isDebugEnabled()) {
                LOG.debug("listStatus: doing listObjects for directory " + key);
            }

            ObjectListing objects = s3.listObjects(request);
            statistics.incrementReadOps(1);

            while (true) {
                for (S3ObjectSummary summary : objects.getObjectSummaries()) {
                    Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, workingDir);
                    // Skip over keys that are ourselves and old S3N _$folder$ files
                    if (keyPath.equals(f) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Ignoring: " + keyPath);
                        }
                        continue;
                    }

                    if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
                        result.add(new S3AFileStatus(true, true, keyPath));
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Adding: fd: " + keyPath);
                        }
                    } else {
                        result.add(new S3AFileStatus(summary.getSize(),
                                dateToLong(summary.getLastModified()), keyPath,
                                getDefaultBlockSize(f.makeQualified(uri, workingDir))));
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Adding: fi: " + keyPath);
                        }
                    }
                }

                for (String prefix : objects.getCommonPrefixes()) {
                    Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
                    if (keyPath.equals(f)) {
                        continue;
                    }
                    result.add(new S3AFileStatus(true, false, keyPath));
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Adding: rd: " + keyPath);
                    }
                }

                if (objects.isTruncated()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("listStatus: list truncated - getting next batch");
                    }

                    objects = s3.listNextBatchOfObjects(objects);
                    statistics.incrementReadOps(1);
                } else {
                    break;
                }
            }
        } else {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Adding: rd (not a dir): " + f);
            }
            result.add(fileStatus);
        }

        return result.toArray(new FileStatus[result.size()]);
    }

/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala

def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = {
    logTrace(s"Listing ${status.getPath}")
    val name = status.getPath.getName.toLowerCase
    if (shouldFilterOut(name)) {
      Array.empty[FileStatus]
    }
    else {
      val statuses = {
        val stats = if(fs.isInstanceOf[S3AFileSystem]){
          logWarning("Using Monkey patched version of list status")
          println("Using Monkey patched version of list status")
          val a = fs.asInstanceOf[S3AFileSystem].listStatusRecursively(status.getPath)
          a
//          Array.empty[FileStatus]
        }
        else{
          val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
          files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter))

        }
        if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
      }
      // statuses do not have any dirs.
      statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
        case f: LocatedFileStatus => f

        // NOTE:
        //
        // - Although S3/S3A/S3N file system can be quite slow for remote file metadata
        //   operations, calling `getFileBlockLocations` does no harm here since these file system
        //   implementations don't actually issue RPC for this method.
        //
        // - Here we are calling `getFileBlockLocations` in a sequential manner, but it should not
        //   be a big deal since we always use to `listLeafFilesInParallel` when the number of
        //   paths exceeds threshold.
        case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
      }
    }
  }

6
为了澄清Gaurav的回答,这段代码片段来自于Hadoop分支2,可能不会在Hadoop 2.9之前出现(参见HADOOP-13208);有人需要更新Spark以使用该功能(这不会影响使用HDFS的代码,只是不会在那里显示任何加速)。考虑的一件事是:什么样的文件布局适合对象存储。不要有深层目录树,每个目录中只有几个文件;而应该有许多文件的浅树形结构。考虑使用文件的前几个字符表示最常变化的值(例如日期/小时),而不是最后几个字符。为什么?一些对象存储似乎使用前导字符进行哈希,而不是尾随字符...如果您的名称更具唯一性,则它们会分散到更多服务器上,具有更好的带宽/较少的限制风险。如果您正在使用Hadoop 2.7库,请切换到s3a://而非s3n://。它已经更快,并且每周都在变得更好,至少在ASF源代码树中。
最后,Apache Hadoop、Apache Spark以及相关项目都是开源的。欢迎贡献。这不仅仅是代码,还包括文档、测试,以及对于性能方面的东西,需要使用实际数据集进行测试。甚至告诉我们什么原因导致问题(以及你的数据集布局)也很有趣。

1
他们已经将此修复程序回溯到2.8.0版本中,该版本应该在几周内发布 :) - Gaurav Shah
不知道时间表;没有人开始发布流程。我相信它将在HDP-2.5中发布,如果它不起作用,我会接到支持电话,并且需要打支持电话。随着2.8 RC流程的开始,测试将有所帮助。Spark无论如何都不会加速,因为它也需要进行调整,而且还有其他要考虑的事情。通过将数据放在较少的目录中(例如按月份而不是按天数),可以使生活更轻松。 - stevel

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接