将Spark DataFrame转换为Scala Map集合

15

我正在尝试找到将整个Spark数据帧转换为Scala Map集合的最佳解决方案。可以用下面的方式进行更好的说明:

从这里开始(在Spark示例中):

val df = sqlContext.read.json("examples/src/main/resources/people.json")

df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

对于这样表示的Scala集合(Map of Maps):

val people = Map(
Map("age" -> null, "name" -> "Michael"),
Map("age" -> 30, "name" -> "Andy"),
Map("age" -> 19, "name" -> "Justin")
)
3个回答

31

我认为你的问题没有意义——在你最外层的Map中,我只看到你试图将值塞入其中——你需要在最外层的Map中使用键/值对。话虽如此:

val peopleArray = df.collect.map(r => Map(df.columns.zip(r.toSeq):_*))

将会给你:

Array(
  Map("age" -> null, "name" -> "Michael"),
  Map("age" -> 30, "name" -> "Andy"),
  Map("age" -> 19, "name" -> "Justin")
)

那时候,您可以这样做:

val people = Map(peopleArray.map(p => (p.getOrElse("name", null), p)):_*)

这会为您提供:

Map(
  ("Michael" -> Map("age" -> null, "name" -> "Michael")),
  ("Andy" -> Map("age" -> 30, "name" -> "Andy")),
  ("Justin" -> Map("age" -> 19, "name" -> "Justin"))
)

我猜这可能是你真正想要的。如果你想按任意Long索引键入它们,可以这样做:

val indexedPeople = Map(peopleArray.zipWithIndex.map(r => (r._2, r._1)):_*)

这会给你:

Map(
  (0 -> Map("age" -> null, "name" -> "Michael")),
  (1 -> Map("age" -> 30, "name" -> "Andy")),
  (2 -> Map("age" -> 19, "name" -> "Justin"))
)

那个有效。我说错了。我只需要一组映射,第一行代码就给了我所需的一切。谢谢。 - Jimmy Hendricks

5

首先从数据框中获取模式

val schemaList = dataframe.schema.map(_.name).zipWithIndex//get schema list from dataframe

从数据帧中获取rdd并与其进行映射

dataframe.rdd.map(row =>
  //here rec._1 is column name and rce._2 index
  schemaList.map(rec => (rec._1, row(rec._2))).toMap
 ).collect.foreach(println)

1
val map =df.collect.map(a=>(a(0)->a(1))).toMap.asInstanceOf[Map[String,String]]

如果需要将结果存储在一个Map而不是数组中:


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接