如何在Spark Scala shell中列出HDFS位置中的所有CSV文件?

7

这样做的目的是为了在HDFS的第二个位置操作和保存每个数据文件的副本。我将使用

RddName.coalesce(1).saveAsTextFile(pathName)

将结果保存到HDFS。

这就是为什么我想要单独处理每个文件,即使我知道性能不会像批量处理那样高效。但是,我还没有确定如何将CSV文件路径列表存储到字符串数组中,然后使用单独的RDD循环遍历每个文件。

让我们使用以下匿名示例作为HDFS源位置:

/data/email/click/date=2015-01-01/sent_20150101.csv
/data/email/click/date=2015-01-02/sent_20150102.csv
/data/email/click/date=2015-01-03/sent_20150103.csv

我知道如何使用Hadoop FS Shell列出文件路径:

HDFS DFS -ls /data/email/click/*/*.csv

我知道如何为所有数据创建一个RDD:
val sentRdd = sc.textFile( "/data/email/click/*/*.csv" )
3个回答

10

我没有彻底测试过它,但像这样的东西似乎可以起作用:

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.hadoop.fs.{FileSystem, Path, LocatedFileStatus, RemoteIterator}
import java.net.URI

val path: String = ???

val hconf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hconf)
val iter = hdfs.listFiles(new Path(path), false)

def listFiles(iter: RemoteIterator[LocatedFileStatus]) = {
  def go(iter: RemoteIterator[LocatedFileStatus], acc: List[URI]): List[URI] = {
    if (iter.hasNext) {
      val uri = iter.next.getPath.toUri
      go(iter, uri :: acc)
    } else {
      acc
    }
  }
  go(iter, List.empty[java.net.URI])
}

listFiles(iter).filter(_.toString.endsWith(".csv"))

你使用URI的原因是什么?我能否只使用Path,并返回List [Path]作为结果? - soMuchToLearnAndShare
@MinnieShi 我认为没有任何理由,你不能这样做。 - zero323
尾递归可以被while循环所替代。 - John Lin

1
这是最终对我有效的代码:

import org.apache.hadoop.fs._
import org.apache.spark.deploy.SparkHadoopUtil
import java.net.URI

val hdfs_conf = SparkHadoopUtil.get.newConfiguration(sc.getConf)
val hdfs = FileSystem.get(hdfs_conf)
// source data in HDFS
val sourcePath = new Path("/<source_location>/<filename_pattern>")

hdfs.globStatus( sourcePath ).foreach{ fileStatus =>
   val filePathName = fileStatus.getPath().toString()
   val fileName = fileStatus.getPath().getName()

   // < DO STUFF HERE>

} // end foreach loop

-1

使用sc.wholeTextFiles(path)应该可以解决问题。它会返回一个包含(文件路径,文件内容)的rdd。


3
那样做不就是在使用数据吗?我只想遍历每个文件路径。 - Jaime

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接