在Spark中使用Scala无需聚合转置DataFrame

22

我在网上查找了许多不同的解决方案,但没有找到我想要实现的内容。请帮助我。

我正在使用Scala的Apache Spark 2.1.0。以下是我的数据框架:


+-----------+-------+
|COLUMN_NAME| VALUE |
+-----------+-------+
|col1       | val1  |
|col2       | val2  |
|col3       | val3  |
|col4       | val4  |
|col5       | val5  |
+-----------+-------+

我希望将这个转置为以下内容:


+-----+-------+-----+------+-----+
|col1 | col2  |col3 | col4 |col5 |
+-----+-------+-----+------+-----+
|val1 | val2  |val3 | val4 |val5 |
+-----+-------+-----+------+-----+

2
如果两个记录具有相同的COLUMN_NAME但不同的VALUE,那该怎么办?那么值应该是什么?如果您知道没有这样的重复项,则数据框架要么非常小(在这种情况下,您可以只需收集它并使用普通Scala进行转换),要么结果将具有太多列。 - Tzach Zohar
两个记录永远不会有相同的列名。实际上,我正在获取以多行形式传入的表格插入/更新详细信息,其中一列是列名,另一列是值,我的计划是将它们转置为数据框并直接更新到Kudu数据库中。第一列的值作为模式,发送列值作为值。因此,我需要从中构建数据框。如果您有任何其他建议/想法,请告诉我。 - Maruti K
4个回答

19
你可以使用 pivot 来实现,但你仍然需要进行聚合操作。但如果你的 COLUMN_NAME 有多个 value 值呢?
val df = Seq(
  ("col1", "val1"),
  ("col2", "val2"),
  ("col3", "val3"),
  ("col4", "val4"),
  ("col5", "val5")
).toDF("COLUMN_NAME", "VALUE")

df
  .groupBy()
  .pivot("COLUMN_NAME").agg(first("VALUE"))
  .show()

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+

编辑:

如果你的数据框像你的示例一样小,你可以将其收集为Map:

val map = df.as[(String,String)].collect().toMap

然后应用这个答案


非常感谢您的快速回复!非常感激!它起作用了:)。唯一的问题是由于数据透视表,速度很慢。 - Maruti K
嘿 Raphael,我知道我们也可以使用 Map 来做这个,但是我无法达到结果。如果你有使用 Map 的逻辑,请分享一下。 - Maruti K
@MarutiK 只需先在您的映射上调用 toSeq,然后应用我的答案。 - Raphael Roth
我能够使用toSeq,但是在对空值进行groupBy()时失败了。我收到的错误是“<console>:46: error: not enough arguments for method groupBy: (f: ((String, String)) => K)scala.collection.immutable.Map[K,Seq[(String, String)]]”。请问有什么建议吗? - Maruti K
@RaphaelRoth 感谢您的回答。如果只有一列呢?我尝试了这个解决方案,但是会多出一行,我该如何去掉它? - Kuldeep Jain

12

如果你的数据框像问题中那样足够小,则可以收集COLUMN_NAME以形成模式,收集VALUE以形成行,然后创建一个新的数据框

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
//creating schema from existing dataframe
val schema = StructType(df.select(collect_list("COLUMN_NAME")).first().getAs[Seq[String]](0).map(x => StructField(x, StringType)))
//creating RDD[Row] 
val values = sc.parallelize(Seq(Row.fromSeq(df.select(collect_list("VALUE")).first().getAs[Seq[String]](0))))
//new dataframe creation
sqlContext.createDataFrame(values, schema).show(false)

这应该会给你

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+

你太棒了,Ramesh!这正是我需要的。非常感谢你的帮助。从性能上讲,这比Pivot更好。 - Maruti K
很高兴听到这个消息,@MarutiK,请在您有资格时不要忘记点赞 ;) - Ramesh Maharjan
2
我们如何在pyspark中实现这个? - pnv

2

另一种解决方案是使用交叉表,虽然过程较长。

 val dfp = spark.sql(""" with t1 (
 select  'col1' c1, 'val1' c2  union all
 select  'col2' c1, 'val2' c2  union all
 select  'col3' c1, 'val3' c2  union all
 select  'col4' c1, 'val4' c2  union all
 select  'col5' c1, 'val5' c2
  )  select   c1  COLUMN_NAME,   c2  VALUE     from t1
""")
dfp.show(50,false)

+-----------+-----+
|COLUMN_NAME|VALUE|
+-----------+-----+
|col1       |val1 |
|col2       |val2 |
|col3       |val3 |
|col4       |val4 |
|col5       |val5 |
+-----------+-----+

val dfp2=dfp.groupBy("column_name").agg( first($"value") as "value" ).stat.crosstab("value", "column_name")
dfp2.show(false)

+-----------------+----+----+----+----+----+
|value_column_name|col1|col2|col3|col4|col5|
+-----------------+----+----+----+----+----+
|val1             |1   |0   |0   |0   |0   |
|val3             |0   |0   |1   |0   |0   |
|val2             |0   |1   |0   |0   |0   |
|val5             |0   |0   |0   |0   |1   |
|val4             |0   |0   |0   |1   |0   |
+-----------------+----+----+----+----+----+

val needed_cols = dfp2.columns.drop(1)

needed_cols: Array[String] = Array(col1, col2, col3, col4, col5)

val dfp3 = needed_cols.foldLeft(dfp2) { (acc,x) => acc.withColumn(x,expr(s"case when ${x}=1 then value_column_name else 0 end")) }
dfp3.show(false)

+-----------------+----+----+----+----+----+
|value_column_name|col1|col2|col3|col4|col5|
+-----------------+----+----+----+----+----+
|val1             |val1|0   |0   |0   |0   |
|val3             |0   |0   |val3|0   |0   |
|val2             |0   |val2|0   |0   |0   |
|val5             |0   |0   |0   |0   |val5|
|val4             |0   |0   |0   |val4|0   |
+-----------------+----+----+----+----+----+

dfp3.select( needed_cols.map( c => max(col(c)).as(c)) :_* ).show

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+

0
为了增强Ramesh Maharjan的答案,需要先收集数据,然后将其转换为地图。
val mp = df.as[(String,String)].collect.toMap

通过一个虚拟数据框,我们可以使用foldLeft进一步构建

val f = Seq("1").toDF("dummy")

mp.keys.toList.sorted.foldLeft(f) { (acc,x) => acc.withColumn(mp(x),lit(x) ) }.drop("dummy").show(false)

+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接