Apache Spark和Python Lambda

8
我有以下代码:
file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")

http://spark.apache.org/examples.html 我从这里复制了例子。

我不理解这段代码,特别是以下关键字:

  1. flatmap,
  2. map 和
  3. reduceby

请有人能以通俗易懂的语言解释一下这些关键字吗?


我不是专家,但我认为flatMap从嵌套结构(行词列表?)构建列表,map将函数应用于所有元素,reduceByKey按键(这里是相同的单词,我猜)分组元素并成对应用函数(这里是求和)。那可能会统计文本中每个单词的出现次数。 - user189
2
如果您使用函数式语言进行函数式编程,代码会变得更加简洁易读。例如,我强烈建议使用Scala而不是面向对象的脚本语言。Scala更加强大,在Spark上略微更具性能,并且使深入了解Spark代码更加容易。您的代码只需为:spark.textFile("hdfs://...").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).saveAsTextFile("hdfs://...") - samthebest
3个回答

16

map是最简单的,它基本上是对序列中的每个元素执行给定操作并返回结果序列(与foreach非常相似)。flatMap也是同样的操作,但不同的是,它允许您返回一个序列(可以为空),而不仅仅是每个元素返回一个元素。这里有一个回答解释了mapflatMap之间的区别。最后,reduceByKey使用聚合函数(意味着它接受两个相同类型的参数并返回该类型,还应该是可交换和结合的,否则会得到不一致的结果)对每个 (K,V) 对序列中的每个 V 进行聚合,针对每个 K

例子*
reduce (lambda a, b: a + b,[1,2,3,4])

这表示使用+聚合整个列表,因此将执行:

1 + 2 = 3  
3 + 3 = 6  
6 + 4 = 10  
final result is 10

按照键值对进行归约是同样的操作,只不过你需要针对每个唯一的键值对进行一次归约。


因此,以您的示例为例进行解释:

file = spark.textFile("hdfs://...") // open text file each element of the RDD is one line of the file
counts = file.flatMap(lambda line: line.split(" ")) //flatMap is needed here to return every word (separated by a space) in the line as an Array
             .map(lambda word: (word, 1)) //map each word to a value of 1 so they can be summed
             .reduceByKey(lambda a, b: a + b) // get an RDD of the count of every unique word by aggregating (adding up) all the 1's you wrote in the last step
counts.saveAsTextFile("hdfs://...") //Save the file onto HDFS

那么,为什么要用这种方式计算单词数量呢?原因是编程中的MapReduce模式高度可并行化,因此可以扩展到对数千兆甚至拍字节的数据进行此类计算。


我不常用Python,请告诉我是否有错误。


5

请见内联注释:

file = spark.textFile("hdfs://...") # opens a file
counts = file.flatMap(lambda line: line.split(" ")) \  # iterate over the lines, split each line by space (into words)
             .map(lambda word: (word, 1)) \ # for each word, create the tuple (word, 1)
             .reduceByKey(lambda a, b: a + b) # go over the tuples "by key" (first element) and sum the second elements
counts.saveAsTextFile("hdfs://...")

关于reduceByKey的更详细解释可以在这里找到:这里


抱歉,我不理解 reduceByKey。在普通的 lambda 表达式中,lambda a, b: a + b 的意思是对于输入的一对 (a,b),给我返回 a + b 的和作为结果,不是吗?但是这里使用了一种奇怪的语法,它做了一些其他的事情。 - jhon.smith
1
要理解reduceBykey,首先必须了解reduce。一个简单的reduce示例:print reduce(lambda a,b:a+b, [1,2,3])它迭代一个可迭代对象并将函数(第一个参数 - 在这里是lambda表达式)应用于前两个元素,然后使用结果与第三个元素等等。 - Nir Alfasi
我重新阅读了您的解释,真希望我也能给您加分。您的评论为我解决了reduceByKey的困惑。 - jhon.smith
@jhon.smith 很高兴我能帮到你,这里的积分没有意义(我不能用它们来买任何东西;) 干杯! - Nir Alfasi

1
这里的答案在代码层面上是准确的,但了解底层操作可能会有所帮助。
我的理解是,当调用reduce操作时,会发生大量的数据洗牌,结果是将由map()操作获得的所有具有相同键值的K-V对分配给一个任务,该任务对K-V对的集合中的值进行求和。然后将这些任务分配给不同的物理处理器,并通过另一个数据洗牌来汇总结果。
因此,如果map操作产生 (cat 1) (cat 1) (dog 1) (cat 1) (cat 1) (dog 1)
那么reduce操作将产生 (cat 4) (dog 2)
希望这可以帮到您。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接