如何在Spark DataFrame中将行转换为列,使用Scala实现。

4
有没有办法将数据框的行转置为列。 我有以下结构作为输入:
val inputDF = Seq(("pid1","enc1", "bat"),
                  ("pid1","enc2", ""),
                  ("pid1","enc3", ""),
                  ("pid3","enc1", "cat"),
                  ("pid3","enc2", "")
              ).toDF("MemberID", "EncounterID", "entry" )

inputDF.show:

+--------+-----------+-----+
|MemberID|EncounterID|entry|
+--------+-----------+-----+
|    pid1|       enc1|  bat|
|    pid1|       enc2|     |
|    pid1|       enc3|     |
|    pid3|       enc1|  cat|
|    pid3|       enc2|     |
+--------+-----------+-----+

expected result:

+--------+----------+----------+----------+-----+
|MemberID|Encounter1|Encounter2|Encounter3|entry|
+--------+----------+----------+----------+-----+
|    pid1|      enc1|      enc2|      enc3|  bat|
|    pid3|      enc1|      enc2|      null|  cat|
+--------+----------+----------+----------+-----+

请建议是否有任何优化的直接API可用于将行转置为列。

我的输入数据规模相当大,因此像collect这样的操作我无法执行,因为它会获取所有驱动程序上的数据。

我正在使用Spark 2.x


如果 entry 对于所有3个 EncounterID 都有值,那该怎么办?是否只能有3个 EncounterID - philantrovert
输入将只有一个值。是的,EncounterID是固定的,只会有3个EncounterID。 - Kalpesh
1
你确定这是你期望的结果吗?所有三个“遭遇”列始终具有相同的值... - Oli
遇到的值会改变。我只是举了一个例子来说明这个值。 - Kalpesh
仍然不确定你想要做什么,但我已经更新了我的答案。 - Oli
1个回答

7
我不确定您需要的是否就是您所询问的内容。但为了防止万一,这里提供一个想法:
val entries = inputDF.where('entry isNotNull)
    .where('entry !== "")
    .select("MemberID", "entry").distinct

val df = inputDF.groupBy("MemberID")
    .agg(collect_list("EncounterID") as "encounterList")
    .join(entries, Seq("MemberID"))
df.show
+--------+-------------------------+-----+
|MemberID|           encounterList |entry|
+--------+-------------------------+-----+
|    pid1|       [enc2, enc1, enc3]|  bat|
|    pid3|             [enc2, enc1]|  cat|
+--------+-------------------------+-----+

列表的顺序是不确定的,但您可以对其进行排序,然后使用.withColumn("Encounter1", sort_array($"encounterList")(0))从中提取新列...

其他想法

如果您想将条目的值放入相应的“Encounter”列中,可以使用数据透视表:

inputDF
    .groupBy("MemberID")
    .pivot("EncounterID", Seq("enc1", "enc2", "enc3"))
    .agg(first("entry")).show

+--------+----+----+----+
|MemberID|enc1|enc2|enc3|
+--------+----+----+----+
|    pid1| bat|    |    |
|    pid3| cat|    |    |
+--------+----+----+----+

添加 Seq("enc1", "enc2", "enc3") 是可选的,但由于您知道列的内容,这将加快计算速度。


抱歉,我无法硬编码数值,这将取决于列中存在哪些值。还有一件事我忘了添加...如果特定的memberID只有2行可用,则代码应该能够将第3列标记为null。我会更新问题。 - Kalpesh
如果未提供数据透视表中的值列表,Spark 将触发一个小型作业来检索它们。 - Oli

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接