在Spark SQL数据框中压缩和展开多个列

3

我可以帮您翻译以下内容,这是有关IT技术的。您需要一个结构如下的数据框:

A: Array[String]   | B: Array[String] | [ ... multiple other columns ...]
=========================================================================
[A, B, C, D]       | [1, 2, 3, 4]     | [ ... array with 4 elements ... ]
[E, F, G, H, I]    | [5, 6, 7, 8, 9]  | [ ... array with 5 elements ... ]
[J]                | [10]             | [ ... array with 1 element ...  ]

我想编写一个UDF,它可以:

  1. 压缩DF中每列在第i个位置上的元素
  2. 将DF展开为这些压缩元组的每个部分

生成的列应该如下所示:

ZippedAndExploded: Array[String]
=================================
[A, 1, ...]
[B, 2, ...]
[C, 3, ...]
[D, 4, ...]
[E, 5, ...]
[F, 6, ...]
[G, 7, ...]
[H, 8, ...]
[I, 9, ...]
[J, 10, ...]

目前我正在使用一个多调用(每个列名一个调用,列名列表在运行时之前收集)到类似这样的UDF:

val myudf6 = udf((xa:Seq[Seq[String]],xb:Seq[String]) => {
  xa.indices.map(i => {
    xa(i) :+ xb(i) // Add one element to the zip column
  })
})

val allColumnNames = df.columns.filter(...)    

for (columnName <- allColumnNames) {
  df = df.withColumn("zipped", myudf8(df("zipped"), df(columnName))
}
df = df.explode("zipped")

由于数据框可能有数百列,这个迭代调用 withColumn 似乎需要很长时间。

问题:是否可以使用一个UDF和单个 DF.withColumn(...) 调用完成此操作?

重要提示:UDF应该压缩动态数量的列(在运行时读取)。


有没有想法如何在PySpark中实现? - linello
2个回答

3

使用一个接受可变列作为输入的UDF。这可以通过一个数组的数组来实现(假设类型相同)。由于你有一个数组的数组,所以可以使用transpose来实现与将列表压缩在一起相同的结果。然后可以对生成的数组进行展开。

val array_zip_udf = udf((cols: Seq[Seq[String]]) => {
  cols.transpose
})

val allColumnNames = df.columns.filter(...).map(col)
val df2 = df.withColumn("exploded", explode(array_zip_udf(array(allColumnNames: _*))))

请注意,在 Spark 2.4+ 版本中,可以使用 arrays_zip 替代 UDF
val df2 = df.withColumn("exploded", explode(arrays_zip(allColumnNames: _*)))

1
使用@Shaido的代码解决了这个问题(将最后一行更改为val df2 = df.withColumn("exploded", explode(array_zip_udf(array(columnNames.head, columnNames.tail : _*)))))。 - D. Müller
1
非常感谢你,你帮我节省了很多时间! - D. Müller
@D.Müller: 很高兴能帮助你 :) 我稍微修改了答案,以避免使用 head/tail 部分,这可以通过在列名中添加 map(col) 来实现。(我忘记了 array 只接受一个 Seq[Column] 而不是 Seq[String] 的事实。) - Shaido

0
如果你知道并确定数组中的值的数量,下面是一个较简单的解决方案之一。
select A[0], B[0]..... from your_table
union all
select A[1], B[1]..... from your_table
union all
select A[2], B[2]..... from your_table
union all
select A[3], B[3]..... from your_table

非常感谢您的快速回复。我已经调整了我的问题,因为数据框可能包含不同大小(按行)的数组。我猜这个解决方案在这样的数据上不起作用,对吗?对于问题的迟到更改,我感到抱歉。 - D. Müller

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接