Spark RDD是什么?它们如何工作?

21

我有一个小的Scala程序,在单节点上运行良好。但是,我正在将它扩展到多个节点上运行。这是我的第一次尝试。我只是想了解RDD在Spark中的工作原理,因此这个问题基于理论,可能不是100%正确。

假设我创建了一个RDD: val rdd = sc.textFile(file)

现在,一旦我这样做了,这是否意味着在文件路径file处的文件现在已经分区到所有节点上(假设所有节点都可以访问文件路径)?

其次,我想计算RDD中对象的数量(很简单),但是我需要在需要应用于RDD中对象的计算中使用该数字 - 以下是伪代码示例:

rdd.map(x => x / rdd.size)
假设有100个对象在`rdd`中,有10个节点,每个节点处理10个对象(这是假设RDD概念的工作方式),那么当我调用方法时,每个节点会使用`rdd.size`作为`10`还是`100`进行计算?因为总体上,RDD的大小为`100`,但在每个节点上它只有`10`。我需要在进行计算之前制作一个广播变量吗?最后,如果我对RDD进行转换,例如`rdd.map(_.split("-"))`,然后我想要新的RDD`size`,我是否需要对RDD执行一个操作,如`count()`,以便将所有信息发送回驱动程序节点?

1
这个问题与下面的问题相关联。 - gsamaras
我认为你的意思是 rdd.flatMap(_.split("-")) - lovasoa
2个回答

19
val rdd = sc.textFile(file)

那是否意味着文件现在已分割成多个节点的分区?

文件仍然在原地。结果为RDD [String]的元素是文件的行。 RDD 被分区以匹配底层文件系统的自然分区。分区数不取决于您拥有的节点数。

重要的是要理解,当执行此行时,它并不读取文件。RDD 是惰性对象,只有在必须执行操作时才会执行。这很棒,因为它避免了不必要的内存使用。

例如,如果您编写val errors = rdd.filter(line => line.startsWith("error")),仍然不会发生任何事情。如果您接下来写val errorCount = errors.count,则现在需要执行操作序列,因为count的结果是整数。每个工作核心(执行线程)将并行执行的操作是:读取文件(或文件片段),迭代其行,并计算以“error”开头的行数。除缓冲和 GC 外,每个核心同时只有一行在内存中。这使得可以处理非常大的数据而不使用太多内存。

我想计算 RDD 中对象的数量,但是我需要使用该数字进行计算,并且该计算需要应用于 RDD 中的对象 - 下面是一个伪代码示例:

rdd.map(x => x / rdd.size)
没有 rdd.size 方法。有 rdd.count,可以计算 RDD 中元素的数量。 rdd.map(x => x / rdd.count) 不能工作。代码将尝试将 rdd 变量发送到所有 worker,并且会失败并抛出 NotSerializableException 异常。你可以这样做:
val count = rdd.count
val normalized = rdd.map(x => x / count)

这个可以工作,因为count是一个Int类型,可以被序列化。

如果我对RDD进行转换,例如rdd.map(_.split("-")),然后我想知道RDD的新长度,我需要对RDD执行操作,例如 count(), 这样所有的信息都会被发送回Driver节点吗?

map并不改变元素数量。我不知道你所说的“长度”是什么意思。但是,是的,你需要执行类似于count的操作才能从RDD中获取任何信息。你知道,只有当你执行某个动作时,才会执行任何工作。(当你执行count时,只有每个分区的计数会被发送回Driver节点,当然不会返回“所有的信息”。)


我基于你在文档中的回答编写了一个 [tag:Python] 示例,如果你喜欢,可以将其包含在你的回答中! - gsamaras
这应该是被接受的答案。它完整且正确地回答了所有部分。 - tejaskhot

6
通常情况下,文件(如果文件过大则可能只是文件的一部分)会被复制到集群中的N个节点上(在HDFS中默认N=3)。这并不意味着每个可用节点都要分配到文件的一部分。
然而,对于您(即客户端)使用Spark处理文件应该是透明的——无论文件在多少个节点上被拆分和/或复制,您都不应该看到rdd.size有任何区别。在Hadoop中至少有方法可以找出当前文件(部分内容)位于哪些节点上。然而,在简单的情况下,您很可能不需要使用此功能。
更新:一篇描述RDD内部机制的文章:https://cs.stanford.edu/~matei/papers/2012/nsdi_spark.pdf

谢谢回复。那么,对于像 rdd.filter(...).map(x => x * rdd.count) 这样的计算,filter 步骤是在任何节点执行 map 步骤之前执行的吗?因为显然,map 步骤依赖于已经在每个节点上执行了 filter 步骤,因为 map 包含 rdd.count。再次感谢。 - monster
当然,因为map是建立在filter之上的(请阅读文章中关于“血统”概念的介绍)。 - Ashalynd
谢谢提供这些信息,很有价值。不过,我现在想知道广播变量的目的是什么?再次感谢,非常感激! - monster
伯克利的链接现在已经失效了。 - Don Branson

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接