Spark的RDD中更新值的高效方法是什么?

11
我正在使用 ScalaSpark 编写一个与图相关的程序。数据集有 400 万个节点和 400 万条边(您可以将其视为一棵树),但每次迭代,我只编辑其中的一部分,即以给定节点为根的子树以及该给定节点和根之间路径上的节点。
每次 Iteration 之间有依赖关系,也就是说,i+1 Iteration 需要来自 i 的结果。因此,我需要存储每个 Iteration 的结果以进行下一步操作。
我试图找到一种有效的更新 RDD 的方法,但目前还没有头绪。我发现 PairRDD 有一个 lookup 函数,它可以将计算时间从 O(N) 减少到 O(M),其中 N 表示 RDD 中对象的总数,M 表示每个分区中元素的数量。
因此,我在思考是否有办法以 O(M) 或更理想的 O(1) 更新 RDD 中的对象?(我在 Spark 的邮件列表中看到有人说可以修改 lookup 以实现 O(1))
另外一个问题是,如果我能够实现 O(M) 更新 RDD,那么我是否可以将分区数增加到大于我拥有的 CPU 核心数量,并实现更好的性能表现?

1
RDD是不可变的,你只能通过转换创建一个新的RDD,但不能更新它。 - cloud
@cloud 谢谢您的评论,这是否意味着我需要创建一个全新的 RDD 而不是仅仅一个分区? - bxshi
1
就这样。我会写一篇答案来详细解释它。 - cloud
3个回答

6
作为功能数据结构,RDD是不可变的,并且对RDD进行的操作会生成一个新的RDD。
结构的不可变性并不一定意味着完全复制。持久化数据结构是一种常见的函数模式,其中对不可变结构的操作会产生一个新结构,但先前版本会被保留并经常被重用。
GraphX(Spark上的“模块”)是一个在Spark之上使用这种概念的图形API:从文档中可以看出:
对图形的值或结构的更改通过生成具有所需更改的新图来完成。请注意,在新图中重用了原始图的相当大部分(即未受影响的结构、属性和索引),从而降低了这种内在功能数据结构的成本。
这可能是手头问题的解决方案:http://spark.apache.org/docs/1.0.0/graphx-programming-guide.html

是的,它们被重复使用,但您仍然需要遍历所有元素来创建新对象。 - bxshi
当你说“我正在尝试找到一种有效的方式来更新RDD”时,我认为你指的是现场突变。你是在谈论查找吗? - maasg
@massg 嗯,我确实想谈论更新RDD,但是我在“迭代”的定义上犯了一个错误。当您执行映射或其他操作以创建新的RDD时,这些操作确实具有并行性,但仍需要访问旧RDD中的所有元素。 - bxshi
@bxshi 确实不可能。RDD只是一系列转换,你唯一可能改变的就是源数据。(或者入侵缓存,但我不建议这样做) - maasg

5
一个RDD是一个分布式数据集,一个分区是RDD存储的单位,处理RDD的单位是元素。
例如,您将一个大文件从HDFS读取为一个RDD,那么这个RDD的元素是字符串(该文件中的行),并且Spark通过分区在整个集群中存储此RDD。对于您作为Spark用户来说,您只需要关心如何处理该文件的行,就像编写普通程序并逐行从本地文件系统读取文件一样。这就是Spark的强大之处:)
无论如何,您不知道哪些元素将存储在某个分区中,因此更新特定分区是没有意义的。

所以根据您和maasg的答案,我应该将RDD视为普通对象,不要试图在较低级别上“调整”性能,因为框架会为我完成这项工作,并且使用对象重用创建新的RDD(基本上只是迭代并替换一些对象)并不像我想象的那么慢? - bxshi
4
RDD对象本身不贵,但其中的数据是昂贵的。例如,您编写一个应用程序:数据源 -> rdd1 -> rdd2 -> rdd3 -> 获取结果。实际上,Spark会记住您的转换t1、t2、t3,并将这些转换应用于数据源以获得结果。除非调用“RDD.cache()”,否则Spark不会保留RDD的数据。 - cloud
@cloud: 这是否意味着,在任何给定时间只会存在一个RDD? - Shankar
@cloud,我问这个问题是因为,如果我们有多个RDD,它会占用更多的磁盘空间/内存,对吗? - Shankar

2
MapReduce编程模型(和FP)并不真正支持单个值的更新。相反,应该定义一系列转换。
当您拥有相互依存的值时,即不能使用简单的map执行转换,而需要聚合多个值并基于该值进行更新时,您需要考虑一种将这些值分组然后转换每个组的方法 - 或者定义一个单子操作,使操作可以分布和分解为子步骤。
现在我会尝试更具体地针对您的特定情况。您说您有子树,是否可能首先将每个节点映射到指示相应子树的键?如果是这样,您可以做如下操作: nodes.map(n => (getSubTreeKey(n), n)).grouByKey().map ... (严格来说,您需要一个可交换的单子)最好阅读http://en.wikipedia.org/wiki/Monoid#Commutative_monoid
例如,+ 是一种幺半群操作,因为当一个人想要计算一个 Ints 的 RDD 的总和时,底层框架可以将数据分成块,在每个块上执行求和,然后在结果总和中进行求和(可能不止两步)。如果您能找到一个幺半群,最终可以产生与单个更新所需的相同结果,则可以分发处理。例如:nodes.reduce(_ myMonoid _)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接