一旦Spark读取一个目录,它会调用
listLeafFiles
(org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala)方法。这个方法又会调用
fs.listStatus
来获取文件和目录的列表。对于每个目录,该方法会再次被调用,这个过程会递归进行直到没有目录为止。这种设计在HDFS系统中效果很好,但在S3中表现不佳,因为列出文件需要通过RPC调用。而S3支持按前缀获取全部文件,这正是我们所需要的。
例如,如果我们有上面提到的目录结构,其中包含一年的数据,每个小时有10个子目录,那么我们需要进行87,000次API调用。但是由于只有137,000个文件,这可以减少到138个API调用。每个S3 API调用返回1000个文件。
代码:
org/apache/hadoop/fs/s3a/S3AFileSystem.java
public FileStatus[] listStatusRecursively(Path f) throws FileNotFoundException,
IOException {
String key = pathToKey(f);
if (LOG.isDebugEnabled()) {
LOG.debug("List status for path: " + f);
}
final List<FileStatus> result = new ArrayList<FileStatus>();
final FileStatus fileStatus = getFileStatus(f);
if (fileStatus.isDirectory()) {
if (!key.isEmpty()) {
key = key + "/";
}
ListObjectsRequest request = new ListObjectsRequest();
request.setBucketName(bucket);
request.setPrefix(key);
request.setMaxKeys(maxKeys);
if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: doing listObjects for directory " + key);
}
ObjectListing objects = s3.listObjects(request);
statistics.incrementReadOps(1);
while (true) {
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
Path keyPath = keyToPath(summary.getKey()).makeQualified(uri, workingDir);
if (keyPath.equals(f) || summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring: " + keyPath);
}
continue;
}
if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
result.add(new S3AFileStatus(true, true, keyPath));
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: fd: " + keyPath);
}
} else {
result.add(new S3AFileStatus(summary.getSize(),
dateToLong(summary.getLastModified()), keyPath,
getDefaultBlockSize(f.makeQualified(uri, workingDir))));
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: fi: " + keyPath);
}
}
}
for (String prefix : objects.getCommonPrefixes()) {
Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
if (keyPath.equals(f)) {
continue;
}
result.add(new S3AFileStatus(true, false, keyPath));
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: rd: " + keyPath);
}
}
if (objects.isTruncated()) {
if (LOG.isDebugEnabled()) {
LOG.debug("listStatus: list truncated - getting next batch");
}
objects = s3.listNextBatchOfObjects(objects);
statistics.incrementReadOps(1);
} else {
break;
}
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding: rd (not a dir): " + f);
}
result.add(fileStatus);
}
return result.toArray(new FileStatus[result.size()]);
}
/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
def listLeafFiles(fs: FileSystem, status: FileStatus, filter: PathFilter): Array[FileStatus] = {
logTrace(s"Listing ${status.getPath}")
val name = status.getPath.getName.toLowerCase
if (shouldFilterOut(name)) {
Array.empty[FileStatus]
}
else {
val statuses = {
val stats = if(fs.isInstanceOf[S3AFileSystem]){
logWarning("Using Monkey patched version of list status")
println("Using Monkey patched version of list status")
val a = fs.asInstanceOf[S3AFileSystem].listStatusRecursively(status.getPath)
a
}
else{
val (dirs, files) = fs.listStatus(status.getPath).partition(_.isDirectory)
files ++ dirs.flatMap(dir => listLeafFiles(fs, dir, filter))
}
if (filter != null) stats.filter(f => filter.accept(f.getPath)) else stats
}
statuses.filterNot(status => shouldFilterOut(status.getPath.getName)).map {
case f: LocatedFileStatus => f
case f => createLocatedFileStatus(f, fs.getFileBlockLocations(f, 0, f.getLen))
}
}
}