从Spark DataFrame的单个列派生多个列

53

我有一个包含大量元数据的DataFrame,其中所有信息都存储在单个字符串列ColmnA中。让我们称之为DFA。

我想通过一个函数将此列ColmnA拆分为多个列。假设函数名为Func1,返回一个ClassXYZ类,其中包含多个变量,每个变量都需要映射到新列,如ColmnA1、ColmnA2等。

我该如何调用Func1仅一次即可完成从一个DataFrame到另一个DataFrame的转换,并添加这些附加列,而不必重复调用Func1来创建所有列。

如果每次添加新列都要调用这个庞大的函数,那么解决起来很容易,但我希望避免这种情况。

请提供一个有效的代码示例或伪代码。

谢谢

Sanjay

5个回答

68

总的来说,你想要的直接实现是不可能的。UDF只能一次返回一列。有两种不同的方法可以克服这个限制:

  1. 返回一个复杂类型的列。最通用的解决方案是StructType,但你也可以考虑使用ArrayTypeMapType

import org.apache.spark.sql.functions.udf

val df = Seq(
  (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c")
).toDF("x", "y", "z")

case class Foobar(foo: Double, bar: Double)

val foobarUdf = udf((x: Long, y: Double, z: String) => 
  Foobar(x * y, z.head.toInt * y))

val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z"))
df1.show
// +---+----+---+------------+
// |  x|   y|  z|      foobar|
// +---+----+---+------------+
// |  1| 3.0|  a| [3.0,291.0]|
// |  2|-1.0|  b|[-2.0,-98.0]|
// |  3| 0.0|  c|   [0.0,0.0]|
// +---+----+---+------------+

df1.printSchema
// root
//  |-- x: long (nullable = false)
//  |-- y: double (nullable = false)
//  |-- z: string (nullable = true)
//  |-- foobar: struct (nullable = true)
//  |    |-- foo: double (nullable = false)
//  |    |-- bar: double (nullable = false)

这个稍后可以很容易地被展开,但通常没有必要。

  • 切换到RDD,重塑并重建DF:

  • import org.apache.spark.sql.types._
    import org.apache.spark.sql.Row
    
    def foobarFunc(x: Long, y: Double, z: String): Seq[Any] = 
      Seq(x * y, z.head.toInt * y)
    
    val schema = StructType(df.schema.fields ++
      Array(StructField("foo", DoubleType), StructField("bar", DoubleType)))
    
    val rows = df.rdd.map(r => Row.fromSeq(
      r.toSeq ++
      foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z"))))
    
    val df2 = sqlContext.createDataFrame(rows, schema)
    
    df2.show
    // +---+----+---+----+-----+
    // |  x|   y|  z| foo|  bar|
    // +---+----+---+----+-----+
    // |  1| 3.0|  a| 3.0|291.0|
    // |  2|-1.0|  b|-2.0|-98.0|
    // |  3| 0.0|  c| 0.0|  0.0|
    // +---+----+---+----+-----+
    

    3
    当你说“通常没有[展平列]”时,为什么会这样?或者Spark是否允许使用顶级列所做的大多数操作也可以用于分层数据(例如df1.foobar.foo)? - max
    2
    @max 因为简单的“结构体”可以在几乎任何情况下使用,当我们通常使用简单的点语法fooobar.foo时。但这并不适用于集合类型。您还可以查看https://dev59.com/PV4c5IYBdhLWcg3wFW6N#33850490 - zero323
    你可以尝试一种不同的方法来给数据框列赋值,在上面的例子中使用"withColumn"是这样的:val df1 = df.withColumn("foo", foobarUdf($"x", $"y", $"z").getField("foo")).withColumn("bar", foobarUdf($"x", $"y", $"z").getField("bar"))现在,模式有了两个新列:"foo"和"bar"。 - evinhas

    18

    假设在您的函数执行后,将出现一系列元素,例如以下示例:

    val df = sc.parallelize(List(("Mike,1986,Toronto", 30), ("Andre,1980,Ottawa", 36), ("jill,1989,London", 27))).toDF("infoComb", "age")
    df.show
    +------------------+---+
    |          infoComb|age|
    +------------------+---+
    |Mike,1986,Toronto| 30|
    | Andre,1980,Ottawa| 36|
    |  jill,1989,London| 27|
    +------------------+---+
    

    现在你可以用这个 infoComb 对象做的事情是,你可以开始拆分字符串,并使用以下代码获取更多列:

    df.select(expr("(split(infoComb, ','))[0]").cast("string").as("name"), expr("(split(infoComb, ','))[1]").cast("integer").as("yearOfBorn"), expr("(split(infoComb, ','))[2]").cast("string").as("city"), $"age").show
    +-----+----------+-------+---+
    | name|yearOfBorn|   city|age|
    +-----+----------+-------+---+
    |Mike|      1986|Toronto| 30|
    |Andre|      1980| Ottawa| 36|
    | jill|      1989| London| 27|
    +-----+----------+-------+---+
    

    希望这可以帮到您。

    你能不能直接这样写: df.select('infoComb.', 'age') 在列名后面加上 . 可以选择结构体中的每个字段作为新列。 - Malcolm McRoberts

    5

    如果你的结果列与原始列的长度相同,你可以使用 withColumn 函数创建全新的列,并应用 UDF。然后,你可以删除原始列,例如:

     val newDf = myDf.withColumn("newCol1", myFun(myDf("originalColumn")))
    .withColumn("newCol2", myFun2(myDf("originalColumn"))
    .drop(myDf("originalColumn"))
    

    这里的myFun是一个用户自定义函数,定义如下:

       def myFun= udf(
        (originalColumnContent : String) =>  {
          // do something with your original column content and return a new one
        }
      )
    

    嗨Niemand,感谢您的回复...但它并没有解决问题... 在您的代码中,您多次调用函数"myDF",而我希望该函数被调用一次,生成一个具有多个字段的类,并将每个字段变量作为新列返回。 - sshroff
    很抱歉,我提出的可能是唯一可行的方法,我认为没有其他方法存在,但希望我是错的;)另外请注意,我并没有多次调用myFun-您可以调用其他函数,例如myFun2、myFun3等来创建所需的列。 - TheMP

    2

    我选择创建一个函数来展平一个列,然后与UDF同时调用它。

    首先定义如下:


    implicit class DfOperations(df: DataFrame) {
    
      def flattenColumn(col: String) = {
        def addColumns(df: DataFrame, cols: Array[String]): DataFrame = {
          if (cols.isEmpty) df
          else addColumns(
            df.withColumn(col + "_" + cols.head, df(col + "." + cols.head)),
            cols.tail
          )
        }
    
        val field = df.select(col).schema.fields(0)
        val newCols = field.dataType.asInstanceOf[StructType].fields.map(x => x.name)
    
        addColumns(df, newCols).drop(col)
      }
    
      def withColumnMany(colName: String, col: Column) = {
        df.withColumn(colName, col).flattenColumn(colName)
      }
    
    }
    

    然后使用非常简单:
    case class MyClass(a: Int, b: Int)
    
    val df = sc.parallelize(Seq(
      (0),
      (1)
    )).toDF("x")
    
    val f = udf((x: Int) => MyClass(x*2,x*3))
    
    df.withColumnMany("test", f($"x")).show()
    
    //  +---+------+------+
    //  |  x|test_a|test_b|
    //  +---+------+------+
    //  |  0|     0|     0|
    //  |  1|     2|     3|
    //  +---+------+------+
    

    你不必使用整个 withColumnMany 的方法。只需使用 select("select.*") 来展开它即可。 - Assaf Mendelson

    -3

    使用透视函数可以轻松实现此目标

    df4.groupBy("year").pivot("course").sum("earnings").collect() 
    

    我在任何答案或原帖中都没有看到“年份”,“课程”或“收益”...你在这个非常简短的回答中所说的数据框是什么? - Kai

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接