如何使用Spark SQL DataFrame和flatMap?

11

我正在使用Spark Scala API。我有一个Spark SQL DataFrame(从Avro文件中读取),其模式如下:

root
|-- ids: array (nullable = true)
|    |-- element: map (containsNull = true)
|    |    |-- key: integer
|    |    |-- value: string (valueContainsNull = true)
|-- match: array (nullable = true)
|    |-- element: integer (containsNull = true)

基本上是2列 [ ids: List[Map[Int, String]], match: List[Int] ]。类似以下示例数据:

[List(Map(1 -> a), Map(2 -> b), Map(3 -> c), Map(4 -> d)),List(0, 0, 1, 0)]
[List(Map(5 -> c), Map(6 -> a), Map(7 -> e), Map(8 -> d)),List(1, 0, 1, 0)]
...

我想做的是对每一行使用flatMap()函数来生成3列[id, property, match]。如果以上述两行作为输入数据,我们将得到:

[1,a,0]
[2,b,0]
[3,c,1]
[4,d,0]
[5,c,1]
[6,a,0]
[7,e,1]
[8,d,0]
...

然后按照属性(例如:a,b,...)groupByString来生成count(“property”)sum(“match”)

 a    2    0
 b    1    0
 c    2    2
 d    2    0
 e    1    1

我想要做类似这样的事情:

val result = myDataFrame.select("ids","match").flatMap( 
    (row: Row) => row.getList[Map[Int,String]](1).toArray() )
result.groupBy("property").agg(Map(
    "property" -> "count",
    "match" -> "sum" ) )

问题在于flatMap将DataFrame转换为RDD。有没有一种好的方法可以使用DataFrames进行flatMap类型的操作,然后再进行groupBy呢?

3个回答

17

flatMap的作用是将每个输入行转换为0个或多个行,并可以过滤它们或添加新的行。在SQL中,要获得相同的功能,您可以使用join。您能否使用join来实现您想要的功能?

另外,您还可以查看Dataframe.explode,这只是join的一种特殊形式(您可以通过将DataFrame连接到UDF来轻松创建自己的explode)。explode接受单个列作为输入,并允许您将其拆分或转换为多个值,然后将原始行与新行重新join。所以:

user      groups
griffin   mkt,it,admin

可以变成:

user      group
griffin   mkt
griffin   it
griffin   admin

因此,我建议看一下DataFrame.explode,如果这不容易解决问题,尝试使用UDFs进行连接。


谢谢您的回答!DataFrame.explode方法正是我在寻找的。 - Yuri Brovman

1

我的 MySQL 有点生疏,但一个选项是在你的 flatMap 中生成 Row 对象列表,然后你可以将结果 RDD 转换回 DataFrame。


0
`myDataFrame.select(explode('ids as "ids",'match).
select( 'ids, explode('match as "match").
map ( r => {
val e=r.getMap[Int,String](0).head
(e._1,e._2,r.getInt(1))
}
)`

groupby ... 可以在之后运行


仅有代码而没有任何解释很少有帮助。Stack Overflow 是关于学习的,而不是提供盲目复制粘贴的片段。请编辑您的问题并解释它如何回答特定的问题。请参阅 如何回答 - Sfili_81

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接