如何将多个文本文件读入单个RDD?

187

我希望从hdfs位置读取一堆文本文件,并使用Spark对其进行迭代映射。

JavaRDD<String> records = ctx.textFile(args[1], 1); 只能一次读取一个文件。

我想读取多个文件并将它们作为单个RDD处理。怎么做呢?

10个回答

312
你可以指定整个目录,使用通配符甚至是目录和通配符的CSV。例如:
sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")

正如Nick Chammas所指出的那样,这是Hadoop的FileInputFormat的一种暴露方式,因此这也适用于Hadoop(和Scalding)。


10
是的,这是将多个文件作为单个RDD打开的最方便的方法。这里的API只是Hadoop的FileInputFormat API的一个暴露,因此所有相同的Path选项都适用。 - Nick Chammas
7
sc.wholeTextFiles对于非按行分隔的数据很方便。 - Michal Čizmazia
1
虽然很奇怪,但如果你这样做并指定并行性,比如 sc.textFile(multipleCommaSeparatedDirs,320),它会导致总共有 19430 个任务而不是 320 个……它的行为就像 union,从非常低的并行性中也会导致疯狂数量的任务。 - lisak
2
我终于找到了这个邪恶的文件模式匹配是如何工作的,因此我不再需要使用逗号分隔。http://stackoverflow.com/a/33917492/306488 - lisak
使用以下API从gzipped文件中读取数据:sc.textFile("/my/directory/*.gz"),并且即使您将其附加到现有的RDD中,最终Spark也会创建新的RDD,因为RDD是弹性的。同时,您提到“所以我可以将其附加到RDD中的行”,请注意即使您附加到现有的RDD中,最终Spark也会创建新的RDD,因为RDD是弹性的。 - Suresh
显示剩余3条评论

35

使用union如下:

val sc = new SparkContext(...)
val r1 = sc.textFile("xxx1")
val r2 = sc.textFile("xxx2")
...
val rdds = Seq(r1, r2, ...)
val bigRdd = sc.union(rdds)

那么bigRdd就是包含所有文件的RDD。


谢谢云,这样我就可以读取所有想要的文件,除了一个!但是还是需要写很多东西... - gsamaras

31

你可以使用单个textFile调用来读取多个文件。Scala:

sc.textFile(','.join(files)) 

5
and identical python syntax - patricksurry
9
我认为这只是 Python 语法。相当于 Scala 的写法是 sc.textFile(files.mkString(",")) - Davos

9

您可以使用这个功能。

首先,您可以获取S3路径的缓冲区/列表:

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket
    listObjectsRequest.setBucketName(s3_bucket)

    //Your Folder path or Prefix
    listObjectsRequest.setPrefix(base_prefix)

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
      }
      listObjectsRequest.setMarker(objectListing.getNextMarker());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name
    files.remove(0)

    //Creating a Scala List for same
    files.asScala
  }

现在将此列表对象传递给以下代码片段,注意:sc是SQLContext的一个对象

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

现在你有了最终的统一RDD,即df

可选的,你还可以将它重新分区成一个单独的大RDD

val files = sc.textFile(filename, 1).repartition(1)

重新分区总是有效的:D

这难道不意味着文件列表必须相对较小吗?不能有数百万个文件。 - Mathieu Longtin
2
我们能否并行读取列出的文件?就像 sc.parallelize 一样? - lazywiz
1
如果您可以将分区发现应用于Spark代码,那就太好了,否则您需要像以前一样进行操作。我曾经在大约一分钟内打开了10k个文件。 - Murtaza Kanchwala
@lazywiz 如果您不想创建单个RDD,则只需删除repartition操作即可。 - Murtaza Kanchwala

3
在PySpark中,我发现了一种额外有用的解析文件的方法。也许在Scala中有一个等效的方法,但我不太熟悉如何提供有效的转换。实际上,它是一种textFile调用,并添加了标签(在下面的示例中,键=文件名,值=文件中的第1行)。 “带标签”的textFile 输入:
import glob
from pyspark import SparkContext
SparkContext.stop(sc)
sc = SparkContext("local","example") # if running locally
sqlContext = SQLContext(sc)

for filename in glob.glob(Data_File + "/*"):
    Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)

输出:数组,每个条目都包含一个元组,使用文件名作为键,并且值等于文件的每一行。(技术上讲,使用这种方法,您也可以使用不同的键,而不是实际的文件路径名称-可能是哈希表示,以节省内存)。例如:

[('/home/folder_with_text_files/file1.txt', 'file1_contents_line1'), ('/home/folder_with_text_files/file1.txt', 'file1_contents_line2'), ('/home/folder_with_text_files/file1.txt', 'file1_contents_line3'), ('/home/folder_with_text_files/file2.txt', 'file2_contents_line1'),  ...]

您也可以将其重新组合成一系列行:

Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

注:上述代码为IT技术相关内容,涉及Spark编程语言。

[('/home/folder_with_text_files/file1.txt', ['file1_contents_line1', 'file1_contents_line2','file1_contents_line3']),
 ('/home/folder_with_text_files/file2.txt', ['file2_contents_line1'])]

或者将整个文件重新组合成单个字符串(在此示例中,结果与从wholeTextFiles获取的结果相同,但是filepathing中的字符串“file:”已被剥离):

Spark_Full.groupByKey()。map(lambda x:(x [0],“”。join(list(x [1]))))。collect()


当我运行这行代码 - Spark_Full += sc.textFile(filename).keyBy(lambda x: filename) 时,我遇到了错误,即 TypeError: 'PipelinedRDD' object is not iterable。我的理解是,该行代码创建了一个 RDD,该 RDD 是不可变的,所以我想知道您是如何将其追加到另一个变量中的? - KartikKannapur

3
你可以使用以下方法:
JavaRDD<String , String> records = sc.wholeTextFiles("path of your directory")

在这里,您将获得文件的路径和文件内容。因此,您可以一次执行整个文件的任何操作,从而节省开销。


1
有一种简单明了的解决方案可用。使用wholeTextFiles()方法。它将采取一个目录并形成键值对。返回的RDD将是一对RDD。 下面是从Spark文档中找到的描述:SparkContext.wholeTextFiles允许您读取包含多个小文本文件的目录,并将它们作为(文件名、内容)对返回。这与textFile不同,后者将在每个文件的每行中返回一条记录。

1

使用sc.textFile方法的所有答案都是正确的。

我只是想知道为什么不使用wholeTextFiles方法。例如,在这种情况下...

val minPartitions = 2
val path = "/pathtohdfs"
    sc.wholeTextFiles(path,minPartitions)
      .flatMap{case (path, text) 
    ...

一个限制是,我们必须加载小文件,否则性能会变差,可能会导致OOM。

注意:

  • 整个文件应该适合内存
  • 适用于不能按行分割的文件格式,例如XML文件

更多参考请访问


或者只需使用sc.wholeTextFiles(文件夹)。flatMap ... - Evhz
sc.wholeTextFiles(“/path/to/dir”) - Ram Ghadiyaram

-1

尝试这个 接口用于将 DataFrame 写入外部存储系统(例如文件系统、键值存储等)。使用 DataFrame.write() 来访问这个接口。

从版本1.4开始新增。

csv(path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=None) 以指定的路径将 DataFrame 的内容保存为 CSV 格式。

参数: path - Hadoop 支持的任何文件系统中的路径 mode - 当数据已经存在时,指定保存操作的行为。

追加:将此DataFrame的内容追加到现有数据中。 覆盖:覆盖现有数据。 忽略:如果数据已经存在,则静默忽略此操作。 错误(默认情况):如果数据已经存在,则引发异常。 compression - 保存到文件时要使用的压缩编解码器。这可以是已知的不区分大小写的缩写名称之一(none、bzip2、gzip、lz4、snappy和deflate)。 sep - 设置单个字符作为每个字段和值的分隔符。如果未设置None,则使用默认值“,”。 quote - 设置用于转义引用值的单个字符,其中分隔符可能是值的一部分。如果未设置None,则使用默认值“”。如果您想关闭引用,您需要设置一个空字符串。 escape - 设置用于在已经引用的值内转义引号的单个字符。如果未设置None,则使用默认值“\”。 escapeQuotes - 表示是否应始终将包含引号的值括在引号中的标志。如果未设置None,则使用默认值true,转义所有包含引号字符的值。 quoteAll - 表示是否始终应将所有值括在引号中的标志。如果未设置None,则使用默认值false,仅转义包含引号字符的值。 header - 将列名作为第一行写入。如果未设置None,则使用默认值false。 nullValue - 设置null值的字符串表示形式。如果未设置None,则使用默认值空字符串。 dateFormat - 设置表示日期格式的字符串。自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于日期类型。如果未设置None,则使用默认值yyyy-MM-dd。 timestampFormat - 设置表示时间戳格式的字符串。自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于时间戳类型。如果未设置None,则使用默认值yyyy-MM-dd'T'HH:mm:ss.SSSZZ。


-4
rdd = textFile('/data/{1.txt,2.txt}')

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接