Spark:将RDD转换为List

18
我有一个RDD结构。
RDD[(String, String)]

我想创建2个列表(一个用于rdd的每个维度)。我尝试使用rdd.foreach()并填充两个ListBuffer,然后将它们转换为Lists,但我猜每个节点都会创建自己的ListBuffer,因为在迭代之后,BufferLists是空的。我该怎么做?

编辑:我的方法

val labeled = data_labeled.map { line =>
  val parts = line.split(',')
  (parts(5), parts(7))
}.cache()

var testList : ListBuffer[String] = new ListBuffer()

labeled.foreach(line =>
  testList += line._1
)
  val labeledList = testList.toList
  println("rdd: " + labeled.count)
  println("bufferList: " + testList.size)
  println("list: " + labeledList.size)

结果是:

rdd: 31990654
bufferList: 0
list: 0

1
请更新您尝试过的代码、一些输入数据样本和期望的输出!您的问题对我来说不是很清楚。 - eliasah
2个回答

22

如果你真的想要创建两个列表 - 意思是,你希望所有分布式数据都被收集到驱动程序中(存在速度缓慢或OutOfMemoryError的风险) - 你可以使用collect,然后对结果使用简单的map操作:

val list: List[(String, String)] = rdd.collect().toList
val col1: List[String] = list.map(_._1)
val col2: List[String] = list.map(_._2)

或者,如果你想要将你的RDD “拆分”成两个RDDs,那么就可以通过不收集数据来实现类似的效果:

rdd.cache() // to make sure calculation of rdd is not repeated twice
val rdd1: RDD[String] = rdd.map(_._1)
val rdd2: RDD[String] = rdd.map(_._2)

第三种选择是先将数据映射到这两个RDD中,然后收集每一个,但这与第一种选项并没有太大区别,并且面临着相同的风险和限制。


@Yuriy,广播变量(只读)与此有何关系?你能详细描述一下吗? - avr
@avr ListBuffer 是可变的, += 操作会改变内部状态而不是创建新的引用。但你的问题很好,对于不可变语句(在任何操作中都会更改引用的情况),需要用某些东西(Serializable)来包装它。关于 List 的简单示例:val testList = sc.broadcast(new Serializable { var list = List.empty[String] }),之后就可以更改内部状态了。 - Yuriy
@Yuriy 我认为avr是正确的,而你误解了他/她的问题 - 这不是可变与不可变集合的问题 - 广播变量在某种意义上是“只读”的,如果它们的值在执行器上被更改,驱动程序代码将看不到这个更改(Spark如何聚合所有执行器所做的更改?)。这在本地模式下工作的事实看起来更像是一个错误,它在实际分布式集群中不起作用。 - Tzach Zohar
你说得对,我漏掉了这一点。我撤销了我的更改,抱歉给你带来困扰。 - Yuriy

2

作为Tzach Zohar答案的另一种选择,你可以在列表上使用unzip

scala> val myRDD = sc.parallelize(Seq(("a", "b"), ("c", "d")))
myRDD: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> val (l1, l2) = myRDD.collect.toList.unzip
l1: List[String] = List(a, c)
l2: List[String] = List(b, d)

或者在RDD中使用keysvalues:
scala> val (rdd1, rdd2) = (myRDD.keys, myRDD.values)
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at keys at <console>:33
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at values at <console>:33

scala> rdd1.foreach{println}
a
c

scala> rdd2.foreach{println}
d
b

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接