如何在Hadoop HDFS中列出目录及其子目录中的所有文件

27

我在hdfs上有一个文件夹,其中有两个子文件夹,每个子文件夹大约有30个子文件夹,最终每个子文件夹都包含xml文件。 我想列出所有的xml文件,只给出主文件夹的路径。 在本地,我可以使用apache commons-io的 FileUtils.listFiles()来做到这一点。 我已经尝试过了。

FileStatus[] status = fs.listStatus( new Path( args[ 0 ] ) );

但它只列出了前两个子文件夹,而没有继续往下查找。在Hadoop中有没有其他方法可以实现这一点?


我知道这是一个关于Java的问题,但如果其他人可以使用操作系统命令,hadoop fs -ls -R /user/your_directory应该会递归地列出目录。 - user9074332
9个回答

34

如果您正在使用Hadoop 2.* API,那么有更加优雅的解决方案:

    Configuration conf = getConf();
    Job job = Job.getInstance(conf);
    FileSystem fs = FileSystem.get(conf);

    //the second boolean parameter here sets the recursion to true
    RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs.listFiles(
            new Path("path/to/lib"), true);
    while(fileStatusListIterator.hasNext()){
        LocatedFileStatus fileStatus = fileStatusListIterator.next();
        //do stuff with the file like ...
        job.addFileToClassPath(fileStatus.getPath());
    }

2
getConf()方法是什么? - vishnu viswanath
getConf()Configured 类中的一个方法。你的类最好继承它。 - Prasoon Joshi
我发现使用 FileSystem fs = path.getFileSystem(conf) 代替 FileSystem fs = FileSystem.get(conf) 可以正确加载文件系统类型(如 hdfs://,gs://...),否则会出现 java.lang.IllegalArgumentException: Wrong FS ... expected: file:/// 的错误。 - Martin Tapp

19
您需要使用 FileSystem 对象并对其中的 FileStatus 对象执行一些逻辑,以手动递归到子目录中。
您还可以应用 PathFilter 来仅返回 XML 文件,使用 listStatus(Path, PathFilter) 方法。
Hadoop FsShell 类对 hadoop fs -lsr 命令进行了示例,这是一个递归的 ls 命令 - 请参见 源代码,大约在第590行(递归步骤在第635行触发)。

最终,我做了一个比你建议的更简单的实现,但是你给了我灵感。谢谢! - nik686
4
失效的参考链接 - AkD
1
@nik686,你能否提供一下你所采用的解决方案? - loneStar
1
我发现使用 FileSystem fs = path.getFileSystem(conf); 而不是 FileSystem fs = FileSystem.get(conf) 可以正确加载文件系统类型 (hdfs://, gs://...),否则会出现 java.lang.IllegalArgumentException: Wrong FS ... expected: file:/// 的错误提示。 - Martin Tapp

17
/**
 * @param filePath
 * @param fs
 * @return list of absolute file path present in given path
 * @throws FileNotFoundException
 * @throws IOException
 */
public static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
    List<String> fileList = new ArrayList<String>();
    FileStatus[] fileStatus = fs.listStatus(filePath);
    for (FileStatus fileStat : fileStatus) {
        if (fileStat.isDirectory()) {
            fileList.addAll(getAllFilePath(fileStat.getPath(), fs));
        } else {
            fileList.add(fileStat.getPath().toString());
        }
    }
    return fileList;
}

快速示例:假设您有以下文件结构:

a  ->  b
   ->  c  -> d
          -> e 
   ->  d  -> f

使用上面的代码,您将得到:

a/b
a/c/d
a/c/e
a/d/f

如果你只想要叶子节点(即文件名),在else代码块中使用以下代码:

 ...
    } else {
        String fileName = fileStat.getPath().toString(); 
        fileList.add(fileName.substring(fileName.lastIndexOf("/") + 1));
    }

这将会得到:

b
d
e
f

fileStat.getPath().toString() 的结果格式为 hdfs://nameservice1/knime/1/2e4a7753-ce62-4ab1-85a1-7dc4e7aae3d5.xlsx,以 hdfs:// 开头。 - Mr Lou
我发现使用FileSystem fs = path.getFileSystem(conf) 而不是 FileSystem fs = FileSystem.get(conf) 可以正确加载fs类型(hdfs://,gs://...),否则会出现java.lang.IllegalArgumentException: Wrong FS ... expected: file:///的错误。 - Martin Tapp

14

你尝试过这个吗:

import java.io.*;
import java.util.*;
import java.net.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

public class cat{
    public static void main (String [] args) throws Exception{
        try{
            FileSystem fs = FileSystem.get(new Configuration());
            FileStatus[] status = fs.listStatus(new Path("hdfs://test.com:9000/user/test/in"));  // you need to pass in your hdfs path

            for (int i=0;i<status.length;i++){
                BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(status[i].getPath())));
                String line;
                line=br.readLine();
                while (line != null){
                    System.out.println(line);
                    line=br.readLine();
                }
            }
        }catch(Exception e){
            System.out.println("File not found");
        }
    }
}

是的,我看过相同的例子,我在上面提到过它。但它只列出深度为1的子目录。我想要从主文件夹中获取最终文件。 - nik686

3

现在,可以使用Spark来完成相同的任务,而且速度比其他方法(如Hadoop MR)要快得多。以下是代码片段。

def traverseDirectory(filePath:String,recursiveTraverse:Boolean,filePaths:ListBuffer[String]) {
    val files = FileSystem.get( sparkContext.hadoopConfiguration ).listStatus(new Path(filePath))
            files.foreach { fileStatus => {
                if(!fileStatus.isDirectory() && fileStatus.getPath().getName().endsWith(".xml")) {                
                    filePaths+=fileStatus.getPath().toString()      
                }
                else if(fileStatus.isDirectory()) {
                    traverseDirectory(fileStatus.getPath().toString(), recursiveTraverse, filePaths)
                }
            }
    }   
}

2

适用于递归和非递归方法的代码片段:

//helper method to get the list of files from the HDFS path
public static List<String>
    listFilesFromHDFSPath(Configuration hadoopConfiguration,
                          String hdfsPath,
                          boolean recursive) throws IOException,
                                        IllegalArgumentException
{
    //resulting list of files
    List<String> filePaths = new ArrayList<String>();

    //get path from string and then the filesystem
    Path path = new Path(hdfsPath);  //throws IllegalArgumentException
    FileSystem fs = path.getFileSystem(hadoopConfiguration);

    //if recursive approach is requested
    if(recursive)
    {
        //(heap issues with recursive approach) => using a queue
        Queue<Path> fileQueue = new LinkedList<Path>();

        //add the obtained path to the queue
        fileQueue.add(path);

        //while the fileQueue is not empty
        while (!fileQueue.isEmpty())
        {
            //get the file path from queue
            Path filePath = fileQueue.remove();

            //filePath refers to a file
            if (fs.isFile(filePath))
            {
                filePaths.add(filePath.toString());
            }
            else   //else filePath refers to a directory
            {
                //list paths in the directory and add to the queue
                FileStatus[] fileStatuses = fs.listStatus(filePath);
                for (FileStatus fileStatus : fileStatuses)
                {
                    fileQueue.add(fileStatus.getPath());
                } // for
            } // else

        } // while

    } // if
    else        //non-recursive approach => no heap overhead
    {
        //if the given hdfsPath is actually directory
        if(fs.isDirectory(path))
        {
            FileStatus[] fileStatuses = fs.listStatus(path);

            //loop all file statuses
            for(FileStatus fileStatus : fileStatuses)
            {
                //if the given status is a file, then update the resulting list
                if(fileStatus.isFile())
                    filePaths.add(fileStatus.getPath().toString());
            } // for
        } // if
        else        //it is a file then
        {
            //return the one and only file path to the resulting list
            filePaths.add(path.toString());
        } // else

    } // else

    //close filesystem; no more operations
    fs.close();

    //return the resulting list
    return filePaths;
} // listFilesFromHDFSPath

1
不要使用递归方法(堆栈问题):) 使用队列
queue.add(param_dir)
while (queue is not empty){

  directory=  queue.pop
 - get items from current directory
 - if item is file add to a list (final list)
 - if item is directory => queue.push
}

那很容易,尽情享受!

1

这里是一个代码片段,用于计算特定HDFS目录中的文件数量(我使用它来确定在特定ETL代码中使用多少个reducer)。您可以轻松修改此代码以适应您的需求。

private int calculateNumberOfReducers(String input) throws IOException {
    int numberOfReducers = 0;
    Path inputPath = new Path(input);
    FileSystem fs = inputPath.getFileSystem(getConf());
    FileStatus[] statuses = fs.globStatus(inputPath);
    for(FileStatus status: statuses) {
        if(status.isDirectory()) {
            numberOfReducers += getNumberOfInputFiles(status, fs);
        } else if(status.isFile()) {
            numberOfReducers ++;
        }
    }
    return numberOfReducers;
}

/**
 * Recursively determines number of input files in an HDFS directory
 *
 * @param status instance of FileStatus
 * @param fs instance of FileSystem
 * @return number of input files within particular HDFS directory
 * @throws IOException
 */
private int getNumberOfInputFiles(FileStatus status, FileSystem fs) throws IOException  {
    int inputFileCount = 0;
    if(status.isDirectory()) {
        FileStatus[] files = fs.listStatus(status.getPath());
        for(FileStatus file: files) {
            inputFileCount += getNumberOfInputFiles(file, fs);
        }
    } else {
        inputFileCount ++;
    }

    return inputFileCount;
}

0

感谢Radu Adrian Moldovan的建议。

这里是使用队列实现的代码:

private static List<String> listAllFilePath(Path hdfsFilePath, FileSystem fs)
throws FileNotFoundException, IOException {
  List<String> filePathList = new ArrayList<String>();
  Queue<Path> fileQueue = new LinkedList<Path>();
  fileQueue.add(hdfsFilePath);
  while (!fileQueue.isEmpty()) {
    Path filePath = fileQueue.remove();
    if (fs.isFile(filePath)) {
      filePathList.add(filePath.toString());
    } else {
      FileStatus[] fileStatus = fs.listStatus(filePath);
      for (FileStatus fileStat : fileStatus) {
        fileQueue.add(fileStat.getPath());
      }
    }
  }
  return filePathList;
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接