使用Spark列出Hadoop HDFS目录中的所有文件?

15
我想循环遍历Hadoop目录中的所有文本文件,并计算单词“error”的出现次数。是否可以使用Apache Spark Scala API的hadoop fs -ls /users/ubuntu/列出目录中的所有文件?从给定的第一个示例中,Spark上下文似乎只能通过类似以下方式逐个访问文件:
val file = spark.textFile("hdfs://target_load_file.txt")
在我的问题中,我不知道HDFS文件夹中有多少个文件或它们的名称。查看了Spark上下文文档,但找不到这种功能。

在我的问题中,我不知道HDFS文件夹中有多少个文件或它们的名称。查看了Spark上下文文档,但找不到这种功能。

3个回答

17

您可以使用通配符:

val errorCount = sc.textFile("hdfs://some-directory/*")
                   .flatMap(_.split(" ")).filter(_ == "error").count

如果我想报告错误发生的文件名怎么办? - Santiago Cepas
2
使用sc.wholeTextFiles。参见https://dev59.com/kYnca4cB1Zd3GeqP_35G,几乎是相同的问题。 - Daniel Darabos

6
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import scala.collection.mutable.Stack


val fs = FileSystem.get( sc.hadoopConfiguration )
var dirs = Stack[String]()
val files = scala.collection.mutable.ListBuffer.empty[String]
val fs = FileSystem.get(sc.hadoopConfiguration)

dirs.push("/user/username/")

while(!dirs.isEmpty){
    val status = fs.listStatus(new Path(dirs.pop()))
    status.foreach(x=> if(x.isDirectory) dirs.push(x.getPath.toString) else 
    files+= x.getPath.toString)
}
files.foreach(println)

1
这是对标题中所述问题最严谨正确的答案。被采纳的答案为OP提供了更具体的回答,针对问题本身所要求的内容,但任何从Google来的人都可能会寻找这个答案。 - Z4-tier

2

对于本地安装而言,(HDFS默认路径fs.defaultFS可在读取/etc/hadoop/core.xml时找到):

例如:

import org.apache.hadoop.fs.{FileSystem, Path}

val conf = sc.hadoopConfiguration
conf.set("fs.defaultFS", "hdfs://localhost:9000")
val hdfs: org.apache.hadoop.fs.FileSystem = org.apache.hadoop.fs.FileSystem.get(conf)
 
val fileStatus = hdfs.listStatus(new Path("hdfs://localhost:9000/foldername/"))
val fileList = fileStatus.map(x => x.getPath.toString)
fileList.foreach(println)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接