现在我想要迭代每个出现的元素,以便对每个文件名和内容执行某些操作。
val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")
然而,我似乎找不到任何关于如何做到这一点的文档。
所以我想要的是:
foreach occurrence-in-the-rdd{
//do stuff with the array found on loccation n of the RDD
}
val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")
然而,我似乎找不到任何关于如何做到这一点的文档。
所以我想要的是:
foreach occurrence-in-the-rdd{
//do stuff with the array found on loccation n of the RDD
}
// set up an example -- an RDD of arrays
val sparkConf = new SparkConf().setMaster("local").setAppName("Example")
val sc = new SparkContext(sparkConf)
val testData = Array(Array(1,2,3), Array(4,5,6,7,8))
val testRDD = sc.parallelize(testData, 2)
// Print the RDD of arrays.
testRDD.collect().foreach(a => println(a.size))
// Use map() to create an RDD with the array sizes.
val countRDD = testRDD.map(a => a.size)
// Print the elements of this new RDD.
countRDD.collect().foreach(a => println(a))
// Use filter() to create an RDD with just the longer arrays.
val bigRDD = testRDD.filter(a => a.size > 3)
// Print each remaining array.
bigRDD.collect().foreach(a => {
a.foreach(e => print(e + " "))
println()
})
}
注意,您编写的函数只接受单个RDD元素作为输入,并返回一些统一类型的数据,因此您创建了一个后一种类型的RDD。例如,countRDD
是一个RDD[Int]
,而bigRDD
仍然是一个RDD[Array[Int]]
。
在某些时候,您可能会想编写修改其他数据的foreach
语句,但由于此问题和答案中所述的原因,您应该抵制这种诱惑。
编辑:不要尝试打印大型RDD
有几位读者询问如何使用collect()
和println()
来查看他们的结果,就像上面的示例一样。当然,这仅适用于在Spark REPL(读取-评估-打印-循环)等交互模式下运行时。最好调用collect()
来获取RDD的顺序数组以进行有序打印。但是,collect()
可能会带回过多的数据,并且无论如何都会打印太多内容。以下是一些获取大型RDD
洞察力的替代方法:
RDD.take()
: This gives you fine control on the number of elements you get but not where they came from -- defined as the "first" ones which is a concept dealt with by various other questions and answers here.
// take() returns an Array so no need to collect()
myHugeRDD.take(20).foreach(a => println(a))
RDD.sample()
: This lets you (roughly) control the fraction of results you get, whether sampling uses replacement, and even optionally the random number seed.
// sample() does return an RDD so you may still want to collect()
myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a))
RDD.takeSample()
: This is a hybrid: using random sampling that you can control, but both letting you specify the exact number of results and returning an Array
.
// takeSample() returns an Array so no need to collect()
myHugeRDD.takeSample(true, 20).foreach(a => println(a))
RDD.count()
: Sometimes the best insight comes from how many elements you ended up with -- I often do this first.
println(myHugeRDD.count())
collect()
调用。 - Spiro Michaylovcollect
处理大量数据是否可行? - deFreitascollect()
,但是正如我与@cpu_meltdown交流的内容一样,最好明确你要用它来做什么。在上面的答案中,我将其用于将诊断信息打印到控制台,但是在生产中当然不会使用大的RDD执行此操作。我已经编辑了答案,试图表明collect()
仅用于诊断。 - Spiro Michaylovmap
和filter
。val txtRDD = someRDD filter { case(id, content) => id.endsWith(".txt") }
txtRDD
现在只包含扩展名为".txt"的文件。//split the documents into words in one long list
val words = txtRDD flatMap { case (id,text) => text.split("\\s+") }
// give each word a count of 1
val wordT = words map (x => (x,1))
//sum up the counts for each word
val wordCount = wordsT reduceByKey((a, b) => a + b)
mapPartitions
函数来处理一些昂贵的初始化操作,例如使用像Stanford coreNLP工具这样的库进行命名实体识别。map
、filter
、flatMap
和reduce
,那么就已经掌握了Spark的核心内容。我建议使用分区映射函数。下面的代码展示了如何在循环中处理整个RDD数据集,以便每个输入都通过相同的函数进行处理。我不熟悉Scala,所以我只能提供Java代码。但是,将其转换为Scala应该并不难。
JavaRDD<String> res = file.mapPartitions(new FlatMapFunction <Iterator<String> ,String>(){
@Override
public Iterable<String> call(Iterator <String> t) throws Exception {
ArrayList<String[]> tmpRes = new ArrayList <>();
String[] fillData = new String[2];
fillData[0] = "filename";
fillData[1] = "content";
while(t.hasNext()){
tmpRes.add(fillData);
}
return Arrays.asList(tmpRes);
}
}).cache();
wholeTextFiles
返回的是一个Pair RDD:
def wholeTextFiles(path: String, minPartitions: Int): RDD[(String, String)]
从HDFS、本地文件系统(在所有节点上都可用)或任何Hadoop支持的文件系统URI中读取文本文件目录。每个文件被读取为单个记录,并以键值对形式返回,其中键是每个文件的路径,值是每个文件的内容。
以下是读取本地路径下的文件并打印每个文件名和内容的示例:
val conf = new SparkConf().setAppName("scala-test").setMaster("local")
val sc = new SparkContext(conf)
sc.wholeTextFiles("file:///Users/leon/Documents/test/")
.collect
.foreach(t => println(t._1 + ":" + t._2));
file:/Users/leon/Documents/test/1.txt:{"name":"tom","age":12}
file:/Users/leon/Documents/test/2.txt:{"name":"john","age":22}
file:/Users/leon/Documents/test/3.txt:{"name":"leon","age":18}
sc.wholeTextFiles("file:///Users/leon/Documents/test/")
.map(t => t._2)
.collect
.foreach { x => println(x)}
{"name":"tom","age":12}
{"name":"john","age":22}
{"name":"leon","age":18}
我认为对于小文件,wholeTextFiles
更加符合规范。
for (element <- YourRDD)
{
// do what you want with element in each iteration, and if you want the index of element, simply use a counter variable in this loop beginning from 0
println (element._1) // this will print all filenames
}