如何将Scala RDD转换为Map

7
我有一个RDD(字符串数组)org.apache.spark.rdd.RDD[String] = MappedRDD[18],想把它转换成具有唯一ID的映射。我使用了'val vertexMAp = vertices.zipWithUniqueId',但这给了我另一个类型为'org.apache.spark.rdd.RDD[(String, Long)]'的RDD,但我想要一个'Map[String, Long]'。如何将我的'org.apache.spark.rdd.RDD[(String, Long)]'转换为'Map[String, Long]'?谢谢。
3个回答

26

PairRDDFunctions中有一个内置的collectAsMap函数,可以将RDD中的键值对转换为一个map。

val vertexMAp = vertices.zipWithUniqueId.collectAsMap

需要记住的是,RDD是一种分布式数据结构。您可以将其视为数据的“片段”分布在集群中。当您执行collect时,您强制所有这些数据片段都传输到驱动程序,并且为了做到这一点,它们需要适合驱动程序的内存。

从评论中看来,在您的情况下,您需要处理一个大型数据集。将其转换为Map不起作用,因为它不适合驱动程序的内存;如果您尝试,则会引发OOM异常。

您可能需要将数据集保留为RDD。如果您要创建Map以查找元素,则可以改为使用PairRDD上的lookup,如下所示:

import org.apache.spark.SparkContext._  // import implicits conversions to support PairRDDFunctions

val vertexMap = vertices.zipWithUniqueId
val vertixYId = vertexMap.lookup("vertexY")

1
如果您的左元组已经具有唯一值,那么您是否需要使用zipWithUniqueId? - alex9311
@maasg在RDD中,查找功能是否跨工作节点运行? - santhosh
@santhosh 是的。lookup 可以在完整分布式 RDD 上工作。 - maasg
嗨@alex9311,你找到了你的问题的答案吗? - mjbsgll

8

将数据收集到“本地”机器上,然后将Array [(String,Long)]转换为Map。

val rdd: RDD[String] = ???

val map: Map[String, Long] = rdd.zipWithUniqueId().collect().toMap

我的RDD有19123380条记录,当我运行val map: Map[String, Long] = rdd.zipWithUniqueId().collect().toMap时,会出现lang.OutOfMemoryError错误。有没有更好的方法来解决这个问题? - Soumitra
不。你可以使用NoSql存储(例如Cassandra)来加载你的RDD,并通过类似于Map的接口访问它。 - Eugene Zhulenev
嗨,尤金,你能否详细说明一下你的评论? - Soumitra
您可以使用 https://github.com/datastax/spark-cassandra-connector 将 RDD[(String, Long)] 保存为Cassandra表。稍后,可以通过按键查找快速使用它。 - Eugene Zhulenev

3

您无需进行转换。对于基于Two-Tuple的RDD,PairRDDFunctions的隐式检测会自动应用PairRDDFunctions方法。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接