在Spark中将多个列追加到现有数据框中

4
我需要将多列添加到现有的Spark DataFrame中,其中列名在列表中给出,假设新列的值是固定的。例如,给定输入列和数据框如下:
val columnsNames=List("col1","col2")
val data = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4))

在追加两列后,假设col1的常量值为"val1",col2的常量值为"val2",输出的数据框应该是:

+-----+---+-------+------+
|   _1| _2|col1   |col2|
+-----+---+-------+------+
|  one|  1|val1   |val2|
|  two|  2|val1   |val2|
|three|  3|val1   |val2|
| four|  4|val1   |val2|
+-----+---+-------+------+

我已经编写了一个用于追加列的函数。
def appendColumns (cols: List[String], ds: DataFrame): DataFrame = {

            cols match {

                case Nil => ds
                case h :: Nil => appendColumns(Nil, ds.withColumn(h, lit(h)))
                case h :: tail => appendColumns(tail, ds.withColumn(h, lit(h)))

            }
        }

有更好、更实用的方法来做这件事吗?

谢谢。


只是为了澄清,在 appendColumns 中,列名与列值相同,而在预期的输出数据框中,例如 col1 的值为 val1,它们可以相同(列名和值),还是您希望它们分开? - Shaido
列名和列值可以相同。 - nat
奇怪的关闭原因。 - thebluephantom
你好,你找到了你的问题的答案吗?还是有什么不清楚的地方吗? - Oli
谢谢Oli,是的,建议的方法非常好。 - nat
2个回答

6

是的,有一种更好、更简单的方法。基本上,你需要调用和列数量相同次数的withColumn函数。当有大量的列时,Spark查询优化引擎Catalyst可能会感到有些不堪重负(我遇到过类似情况),在尝试使用数千个列进行实验时,甚至还导致驱动程序的OOM。为了避免过多地压力Catalyst(并且写更少的代码;-)),你可以简单地使用下面的select命令来完成这个任务:

val data = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF
// let's assume that we have a map that associates column names to their values
val columnMap = Map("col1" -> "val1", "col2" -> "val2")
// Let's create the new columns from the map
val newCols = columnMap.keys.map(k => lit(columnMap(k)) as k)
// selecting the old columns + the new ones
data.select(data.columns.map(col) ++ newCols : _*).show
+-----+---+----+----+
|   _1| _2|col1|col2|
+-----+---+----+----+
|  one|  1|val1|val2|
|  two|  2|val1|val2|
|three|  3|val1|val2|
| four|  4|val1|val2|
+-----+---+----+----+

5

与递归相反,使用foldLeft更为普遍的方法可以更加通用,对于有限数量的列而言。使用Databricks Notebook:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

import spark.implicits._

val columnNames = Seq("c3","c4")
val df = Seq(("one", 1), ("two", 2), ("three", 3), ("four", 4)).toDF("c1", "c2")

def addCols(df: DataFrame, columns: Seq[String]): DataFrame = {
    columns.foldLeft(df)((acc, col) => {
      acc.withColumn(col, lit(col)) })
}

val df2 = addCols(df, columnNames)
df2.show(false)

返回:

+-----+---+---+---+
|c1   |c2 |c3 |c4 |
+-----+---+---+---+
|one  |1  |c3 |c4 |
|two  |2  |c3 |c4 |
|three|3  |c3 |c4 |
|four |4  |c3 |c4 |
+-----+---+---+---+

请注意以下内容:https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015,尽管在稍微不同的背景下,另一个答案通过选择方法暗示了这一点。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接