Spark reduce函数:了解其工作原理

5

我正在学习课程

它说RDD上的reduce操作是逐台机器完成的。这意味着,如果您的数据分布在两台计算机上,那么下面的函数将在第一台计算机上处理数据,找到该数据的结果,然后它将从第二台计算机中获取单个值,运行函数,并继续这种方式,直到完成所有来自第二台机器的值。这正确吗?

我认为函数将同时开始在两台机器上操作,然后一旦它从两台机器获得结果,它将再次运行最后一次函数。

rdd1=rdd.reduce(lambda x,y: x+y)

更新 1--------------------------------------------

以下步骤是否比 reduce 函数更快获得答案?

Rdd=[3,5,4,7,4]
seqOp = (lambda x, y: x+y)
combOp = (lambda x, y: x+y)
collData.aggregate(0, seqOp, combOp)

更新2-----------------------------------

以下两组代码执行时间相同吗?我进行了检查,似乎它们都需要相同的时间。

import datetime

data=range(1,1000000000)
distData = sc.parallelize(data,4)
print(datetime.datetime.now())
a=distData.reduce(lambda x,y:x+y)
print(a)
print(datetime.datetime.now())

seqOp = (lambda x, y: x+y)
combOp = (lambda x, y: x+y)
print(datetime.datetime.now())
b=distData.aggregate(0, seqOp, combOp)
print(b)
print(datetime.datetime.now())
1个回答

3

reduce的行为在本地语言(Scala)和客户语言(Python)之间略有不同,但可以简化一下:

  • 每个分区按顺序逐个元素进行处理
  • 多个分区可以同时由单个工作线程(多个执行器线程)或不同的工作线程处理
  • 部分结果被提取到驱动程序,最终的缩减应用于此处(这是PySpark和Scala中实现方式不同的一部分)

由于您似乎正在使用Python,请看一下代码:

  1. reduce creates a simple wrapper for a user provided function:

    def func(iterator):
        ...
    
  2. This is wrapper is used to mapPartitions:

    vals = self.mapPartitions(func).collect()
    

    It should be obvious this code is embarrassingly parallel and doesn't care how the results are utilized

  3. Collected vals are reduced sequentially on the driver using standard Python reduce:

    reduce(f, vals)
    

    where f is a functions passed to RDD.reduce

相比之下,Scala会异步地合并来自工作节点的部分结果。

treeReduce的情况下,第三步也可以以分布式的方式执行。详见了解Spark中的treeReduce()

总结一下,reduce(除驱动程序端处理外)使用与基本转换(如mapfilter)完全相同的机制(mapPartitions),并提供相同级别的并行性(再次排除驱动程序代码)。如果您拥有大量分区或f操作很昂贵,可以使用tree*系列方法并行化/分发最终合并。


我已经阅读了您的回答。我很难理解您的输入,并且无法确定课程中所述的语句是否正确。根据“多个分区可以由单个工作器(多个执行线程)或不同的工作器同时处理”的说法,该语句似乎是不正确的。请直接回答问题。请使用示例突出您的观点-例如,RDD是[1,2,3,4,5,6],[1,2,3]在一台机器上,其余元素在另一台机器上... Spark和Scala如何分别处理它们?感谢您的工作。 - user2543622
我没有看过这个课程,所以我不能参考它,但如果他们真的告诉你这是机器完成的,那么你浪费了200美元。reduce,除了驱动程序部分外,使用的机制与标准Spark转换相同,因此表现出相同的并行性。 - zero323
请用一个例子来突出你的意思 - 例如,RDD是[1,2,3,4,5,6],其中[1,2,3]在一台机器上,其余元素在另一台机器上。Spark和Scala如何处理这些不同?此外,能否回答我更新后的问题? - user2543622
a) aggregatereduce之间不应该有显著的性能差异。 b) 我无法提供示例,因为通常顺序是不确定的。您可以在这里看到非常粗略的可视化效果(https://dev59.com/-5Pfa4cB1Zd3GeqPG7jJ),但从根本上讲,操作是不同步的。 c) 关于Scala - 正如我已经说过的那样 - Scala通过异步方式获取任务结果,而不是通过收集。 - zero323
@zero323 从您的回答中,我理解reduce最终也是在驱动程序中执行的,我理解正确吗?谢谢。 - jack

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接