如何在Apache Spark(Scala)中迭代RDD?

24
我使用下面的命令来填充一个RDD,其中包含一堆包含2个字符串["文件名","内容"]的数组。
现在我想要迭代每个出现的元素,以便对每个文件名和内容执行某些操作。
val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")

然而,我似乎找不到任何关于如何做到这一点的文档。

所以我想要的是:

foreach occurrence-in-the-rdd{
   //do stuff with the array found on loccation n of the RDD
} 

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD - Malcolm
5个回答

27
你可以在RDD上调用各种接受函数作为参数的方法。
// set up an example -- an RDD of arrays
val sparkConf = new SparkConf().setMaster("local").setAppName("Example")
val sc = new SparkContext(sparkConf)
val testData = Array(Array(1,2,3), Array(4,5,6,7,8))
val testRDD = sc.parallelize(testData, 2)

// Print the RDD of arrays.
testRDD.collect().foreach(a => println(a.size))

// Use map() to create an RDD with the array sizes.
val countRDD = testRDD.map(a => a.size)

// Print the elements of this new RDD.
countRDD.collect().foreach(a => println(a))

// Use filter() to create an RDD with just the longer arrays.
val bigRDD = testRDD.filter(a => a.size > 3)

// Print each remaining array.
bigRDD.collect().foreach(a => {
    a.foreach(e => print(e + " "))
    println()
  })
}

注意,您编写的函数只接受单个RDD元素作为输入,并返回一些统一类型的数据,因此您创建了一个后一种类型的RDD。例如,countRDD是一个RDD[Int],而bigRDD仍然是一个RDD[Array[Int]]

在某些时候,您可能会想编写修改其他数据的foreach语句,但由于此问题和答案中所述的原因,您应该抵制这种诱惑。

编辑:不要尝试打印大型RDD

有几位读者询问如何使用collect()println()来查看他们的结果,就像上面的示例一样。当然,这仅适用于在Spark REPL(读取-评估-打印-循环)等交互模式下运行时。最好调用collect()来获取RDD的顺序数组以进行有序打印。但是,collect()可能会带回过多的数据,并且无论如何都会打印太多内容。以下是一些获取大型RDD洞察力的替代方法:

  1. RDD.take(): This gives you fine control on the number of elements you get but not where they came from -- defined as the "first" ones which is a concept dealt with by various other questions and answers here.

    // take() returns an Array so no need to collect()
    myHugeRDD.take(20).foreach(a => println(a))
    
  2. RDD.sample(): This lets you (roughly) control the fraction of results you get, whether sampling uses replacement, and even optionally the random number seed.

    // sample() does return an RDD so you may still want to collect()
    myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a))
    
  3. RDD.takeSample(): This is a hybrid: using random sampling that you can control, but both letting you specify the exact number of results and returning an Array.

    // takeSample() returns an Array so no need to collect() 
    myHugeRDD.takeSample(true, 20).foreach(a => println(a))
    
  4. RDD.count(): Sometimes the best insight comes from how many elements you ended up with -- I often do this first.

    println(myHugeRDD.count())       
    

不会存在不同的工作线程以不协调的方式打印并混淆它们的行吗?如果它仍然可以正常工作,这是否有记录? - Eric O. Lebigot
1
是的,我有些粗心了——我刚刚编辑了我的答案,在打印RDD时添加了collect()调用。 - Spiro Michaylov
不错,提供了完整的示例,甚至展示了sc的设置! - JimLohse
在生产环境中,使用 collect 处理大量数据是否可行? - deFreitas
一般来说,我会说不需要使用collect(),但是正如我与@cpu_meltdown交流的内容一样,最好明确你要用它来做什么。在上面的答案中,我将其用于将诊断信息打印到控制台,但是在生产中当然不会使用大的RDD执行此操作。我已经编辑了答案,试图表明collect()仅用于诊断。 - Spiro Michaylov
显示剩余3条评论

9
Spark中的基本操作是mapfilter
val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") }
txtRDD现在只包含扩展名为".txt"的文件。
如果您想对这些文件进行词频统计,可以使用以下命令:
//split the documents into words in one long list
val words = txtRDD flatMap { case (id,text) => text.split("\\s+") }
// give each word a count of 1
val wordT = words map (x => (x,1))  
//sum up the counts for each word
val wordCount = wordsT reduceByKey((a, b) => a + b)

你想使用mapPartitions函数来处理一些昂贵的初始化操作,例如使用像Stanford coreNLP工具这样的库进行命名实体识别。
如果掌握了mapfilterflatMapreduce,那么就已经掌握了Spark的核心内容。

2

我建议使用分区映射函数。下面的代码展示了如何在循环中处理整个RDD数据集,以便每个输入都通过相同的函数进行处理。我不熟悉Scala,所以我只能提供Java代码。但是,将其转换为Scala应该并不难。

JavaRDD<String> res = file.mapPartitions(new FlatMapFunction <Iterator<String> ,String>(){ 
      @Override
      public Iterable<String> call(Iterator <String> t) throws Exception {  

          ArrayList<String[]> tmpRes = new ArrayList <>();
          String[] fillData = new String[2];

          fillData[0] = "filename";
          fillData[1] = "content";

          while(t.hasNext()){
               tmpRes.add(fillData);  
          }

          return Arrays.asList(tmpRes);
      }

}).cache();

这个实际上是并行运行的吗?我认为在Spark中,只要在匿名函数中调用for或while循环,它就会在单个执行器中按顺序运行。 - JimLohse
@JimLohse 是的,它确实可以。 - thebluephantom
@thebluephantom 对不起让你困惑了,你的意思是它是并行运行还是顺序运行?谢谢,这是三年前的事情,可能是在Spark 1.6上,自2016年末以来我就没有再使用过它。 - JimLohse
它可以并行运行,也可以同时进行。 - thebluephantom

1

wholeTextFiles返回的是一个Pair RDD:

def wholeTextFiles(path: String, minPartitions: Int): RDD[(String, String)]

从HDFS、本地文件系统(在所有节点上都可用)或任何Hadoop支持的文件系统URI中读取文本文件目录。每个文件被读取为单个记录,并以键值对形式返回,其中键是每个文件的路径,值是每个文件的内容。

以下是读取本地路径下的文件并打印每个文件名和内容的示例:

val conf = new SparkConf().setAppName("scala-test").setMaster("local")
val sc = new SparkContext(conf)
sc.wholeTextFiles("file:///Users/leon/Documents/test/")
  .collect
  .foreach(t => println(t._1 + ":" + t._2));

结果为:
file:/Users/leon/Documents/test/1.txt:{"name":"tom","age":12}

file:/Users/leon/Documents/test/2.txt:{"name":"john","age":22}

file:/Users/leon/Documents/test/3.txt:{"name":"leon","age":18}

或者先将Pair RDD转换为RDD
sc.wholeTextFiles("file:///Users/leon/Documents/test/")
  .map(t => t._2)
  .collect
  .foreach { x => println(x)}

结果为:
{"name":"tom","age":12}

{"name":"john","age":22}

{"name":"leon","age":18}

我认为对于小文件,wholeTextFiles 更加符合规范。


0
for (element <- YourRDD)
{
     // do what you want with element in each iteration, and if you want the index of element, simply use a counter variable in this loop beginning from 0 
     println (element._1) // this will print all filenames
}

2
请在这个答案中添加一些细节,比如一个更完整的例子,目前这个答案不是很好。 - Preston
每个项目将在for循环中使用element._1作为文件名,element._2作为内容。 - AliSafari186

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接