将数据框转换为特定结构

4

我有一个扁平的数据框(df),其结构如下:

root
 |-- first_name: string (nullable = true)
 |-- middle_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- title: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- end_Date: string (nullable = true)
 |-- city: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- email_name: string (nullable = true)
 |-- company: struct (nullable = true)
 |-- org_name: string (nullable = true)
 |-- company_phone: string (nullable = true)
 |-- partition_column: string (nullable = true)

我需要将这个数据框转换成类似下一个数据格式的结构:

root
 |-- firstName: string (nullable = true)
 |-- middleName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- currentPosition: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- title: string (nullable = true)
 |    |    |-- startDate: string (nullable = true)
 |    |    |-- endDate: string (nullable = true)
 |    |    |-- address: struct (nullable = true)
 |    |    |    |-- city: string (nullable = true)
 |    |    |    |-- zipCode: string (nullable = true)
 |    |    |    |-- state: string (nullable = true)
 |    |    |    |-- country: string (nullable = true)
 |    |    |-- emailName: string (nullable = true)
 |    |    |-- company: struct (nullable = true)
 |    |    |    |-- orgName: string (nullable = true)
 |    |    |    |-- companyPhone: string (nullable = true)
 |-- partitionColumn: string (nullable = true)

目前我已经实现了以下内容:

case class IndividualCompany(orgName: String,
                             companyPhone: String)

case class IndividualAddress(city: String,
                   zipCode: String,
                   state: String,
                   country: String)

case class IndividualPosition(title: String,
                              startDate: String,
                              endDate: String,
                              address: IndividualAddress,
                              emailName: String,
                              company: IndividualCompany)

case class Individual(firstName: String,
                     middleName: String,
                     lastName: String,
                     currentPosition: Seq[IndividualPosition],
                     partitionColumn: String)


val makeCompany = udf((orgName: String, companyPhone: String) => IndividualCompany(orgName, companyPhone))
val makeAddress = udf((city: String, zipCode: String, state: String, country: String) => IndividualAddress(city, zipCode, state, country))

val makePosition = udf((title: String, startDate: String, endDate: String, address: IndividualAddress, emailName: String, company: IndividualCompany) 
                    => List(IndividualPosition(title, startDate, endDate, address, emailName, company)))


val selectData = df.select(
      col("first_name").as("firstName"),
      col("middle_name).as("middleName"),
      col("last_name").as("lastName"),
      makePosition(col("job_title"),
        col("start_date"),
        col("end_Date"),
        makeAddress(col("city"),
          col("zip_code"),
          col("state"),
          col("country")),
        col("email_name"),
        makeCompany(col("org_name"),
          col("company_phone"))).as("currentPosition"),
      col("partition_column").as("partitionColumn")
    ).as[Individual]

select_data.printSchema()
select_data.show(10)

我可以看到为select_data生成了适当的模式,但在我尝试获取一些实际数据的最后一行时出现了错误。我得到一个错误,说无法执行用户定义的函数。

 org.apache.spark.SparkException: Failed to execute user defined function(anonfun$4: (string, string, string, struct<city:string,zipCode:string,state:string,country:string>, string, struct<orgName:string,companyPhone:string>) => array<struct<title:string,startDate:string,endDate:string,address:struct<city:string,zipCode:string,state:string,country:string>,emailName:string,company:struct<orgName:string,companyPhone:string>>>)

有没有更好的方法来实现这个?

可能是Spark UDF for StructType / Row的重复问题。 - 10465355
2个回答

2

这里的问题在于,udf不能直接将IndividualAddressIndividualCompany作为输入。在Spark中,它们表示为结构体,并且要在udf中使用它们,正确的输入类型是Row。这意味着您需要更改makePosition的声明:

val makePosition = udf((title: String, 
                        startDate: String, 
                        endDate: String, 
                        address: Row, 
                        emailName: String, 
                        company: Row)

udf 中,您现在需要使用例如 address.getAs[String]("city") 来访问 case class 元素,并且要使用整个类,您需要再次创建它。

更简单更好的替代方案是在一个单独的 udf 中完成所有操作,如下所示:

val makePosition = udf((title: String, 
    startDate: String, 
    endDate: String, 
    city: String, 
    zipCode: String, 
    state: String, 
    country: String,
    emailName: String, 
    orgName: String, 
    companyPhone: String) => 
        Seq(
          IndividualPosition(
            title, 
            startDate, 
            endDate, 
            IndividualAddress(city, zipCode, state, country),
            emailName, 
            IndividualCompany(orgName, companyPhone)
          )
        )
)

感谢您提供的解决方案。在一个 udf 中完成所有操作无法传递超过10个参数,这就是我选择嵌套 udf 的原因。 - Harshad_Pardeshi
对于第一种解决方案,我该如何从df.select()方法将Row类型传递给此UDF? - Harshad_Pardeshi
1
知道了。我只是修改了我的 makeAddress 和 makeCompany 方法,如下:val makeCompany = udf((orgName: String, companyPhone: String) => {Row(orgName, companyPhone)},companySchema) - Harshad_Pardeshi

1
我有类似的需求。
我的做法是创建一个类型化的用户定义聚合,它将生成一个元素List
import org.apache.spark.sql.{Encoder, TypedColumn}
import org.apache.spark.sql.expressions.Aggregator
import scala.collection.mutable

object ListAggregator {
  private type Buffer[T] = mutable.ListBuffer[T]

  /** Returns a column that aggregates all elements of type T in a List. */
  def create[T](columnName: String)
               (implicit listEncoder: Encoder[List[T]], listBufferEncoder: Encoder[Buffer[T]]): TypedColumn[T, List[T]] =
    new Aggregator[T, Buffer[T], List[T]] {
      override def zero: Buffer[T] =
        mutable.ListBuffer.empty[T]

      override def reduce(buffer: Buffer[T], elem: T): Buffer[T] =
        buffer += elem

      override def merge(b1: Buffer[T], b2: Buffer[T]): Buffer[T] =
        if (b1.length >= b2.length) b1 ++= b2 else b2 ++= b1

      override def finish(reduction: Buffer[T]): List[T] =
        reduction.toList

      override def bufferEncoder: Encoder[Buffer[T]] =
        listBufferEncoder

      override def outputEncoder: Encoder[List[T]] =
        listEncoder
    }.toColumn.name(columnName)
}

现在你可以像这样使用它。
import org.apache.spark.sql.SparkSession

val spark =
  SparkSession
    .builder
    .master("local[*]")
    .getOrCreate()

import spark.implicits._

final case class Flat(id: Int, name: String, age: Int)
final case class Grouped(age: Int, users: List[(Int, String)])

val data =
  List(
    (1, "Luis", 21),
    (2, "Miguel", 21),
    (3, "Sebastian", 16)
  ).toDF("id", "name", "age").as[Flat]

val grouped =
  data
    .groupByKey(flat => flat.age)
    .mapValues(flat => (flat.id, flat.name))
    .agg(ListAggregator.create(columnName = "users"))
    .map(tuple => Grouped(age = tuple._1, users = tuple._2))
// grouped: org.apache.spark.sql.Dataset[Grouped] = [age: int, users: array<struct<_1:int,_2:string>>]

grouped.show(truncate = false)
// +---+------------------------+
// |age|users                   |
// +---+------------------------+
// |16 |[[3, Sebastian]]        |
// |21 |[[1, Luis], [2, Miguel]]|
// +---+------------------------+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接