如何将两个RDD合并为一个RDD

3

帮忙,我有两个RDD,想要合并成一个RDD。以下是我的代码。

val us1 = sc.parallelize(Array(("3L"), ("7L"),("5L"),("2L")))
val us2 = sc.parallelize(Array(("432L"), ("7123L"),("513L"),("1312L")))

1
你期望的输出是什么?你尝试了什么? - mtoto
1
3L 7L 5L 2L 432L 7123L 513L 1312L - Simon
我想要这个RDD,意味着两个RDD合并成一个RDD。 - Simon
val newrdd = us1.++(us2) - Simon
2个回答

11

只需使用union:

val merged = us1.union(us2)

文档在这里

Scala中的快捷方式是:

val merged = us1 ++ us2

1
@Simon【请点赞或接受答案,而不是留下感谢评论】(http://stackoverflow.com/help/someone-answers) - evan.oman

5
您需要使用 RDD.union,这些不需要基于键进行连接。 Union 本身并没有实际作用,因此它的开销很低。请注意,合并后的 RDD 将具有原始 RDD 的所有分区,因此您可能需要在联合后进行 coalesce 操作以减少分区数。
val x = sc.parallelize(Seq( (1, 3), (2, 4) ))
val y = sc.parallelize(Seq( (3, 5), (4, 7) ))
val z = x.union(y)
z.collect
res0: Array[(Int, Int)] = Array((1,3), (2,4), (3,5), (4,7))

API

def++(other: RDD[T]): RDD[T]

返回此RDD和另一个RDD的并集。

def++ API

def union(other: RDD[T]): RDD[T]

返回此RDD和另一个RDD的并集。任何相同的元素将出现多次(使用.distinct()来消除它们)。

def union API


为什么要在之后进行合并?如果两个输入RDD被正确地分区,那么联合RDD也将是如此。 - Tim
仅为了性能和更新分区。这不是强制性的,但可以执行。它返回一个新的RDD,该RDD被缩减为numPartitions个分区。 - Indrajit Swain
我明白coalesce的作用。但是,如果你的输入RDD的分区大小正确,执行coalesce将会产生过大的分区(特别是如果你使用shuffle = false选项)。 - Tim
如果分区正确完成,那么一切都很好。你的代码可以运行 :) - Indrajit Swain

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接