在Spark数据框中展开嵌套结构体

34

我正在学习 Databricks 的一个示例。数据框的模式如下:

> parquetDF.printSchema
root
|-- department: struct (nullable = true)
|    |-- id: string (nullable = true)
|    |-- name: string (nullable = true)
|-- employees: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- firstName: string (nullable = true)
|    |    |-- lastName: string (nullable = true)
|    |    |-- email: string (nullable = true)
|    |    |-- salary: integer (nullable = true)
在这个例子中,他们展示了如何将员工列拆分成4个附加列。
val explodeDF = parquetDF.explode($"employees") { 
case Row(employee: Seq[Row]) => employee.map{ employee =>
  val firstName = employee(0).asInstanceOf[String]
  val lastName = employee(1).asInstanceOf[String]
  val email = employee(2).asInstanceOf[String]
  val salary = employee(3).asInstanceOf[Int]
  Employee(firstName, lastName, email, salary)
 }
}.cache()
display(explodeDF)

如何使用部门列做类似的操作(即向数据框添加名为"id"和"name"的两列)? 方法并不完全相同,我只能想出如何创建一个全新的数据帧:

val explodeDF = parquetDF.select("department.id","department.name")
display(explodeDF)

如果我尝试:

val explodeDF = parquetDF.explode($"department") { 
  case Row(dept: Seq[String]) => dept.map{dept => 
  val id = dept(0) 
  val name = dept(1)
  } 
}.cache()
display(explodeDF)

我收到了警告和错误:

<console>:38: warning: non-variable type argument String in type pattern Seq[String] is unchecked since it is eliminated by erasure
            case Row(dept: Seq[String]) => dept.map{dept => 
                           ^
<console>:37: error: inferred type arguments [Unit] do not conform to    method explode's type parameter bounds [A <: Product]
  val explodeDF = parquetDF.explode($"department") { 
                                   ^
3个回答

49

25

一个阶段失败:org.apache.spark.SparkException: 由于阶段失败,作业中止:第41.0阶段中的任务0失败了4次,最近一次失败:在第41.0阶段中丢失了任务0.3(TID 1403, 10.81.214.49):scala.MatchError:[[789012,机械工程]](类org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)。 - Feynman27
@Feynman27 这个链接是否有帮助?它似乎与您的尝试相匹配。我认为我的答案问题在于employees也有一个元素,而department没有。 - gsamaras
是的,employees 示例创建了新行,而 department 示例只应该创建两个新列。 - Feynman27
相关问题:https://dev59.com/A10a5IYBdhLWcg3wuKw7 - Tagar
我们能否一次性对所有嵌套列进行重命名操作?例如,department.id -> inner_iddepartment.name -> inner_name,... - Saddle Point

3

这似乎可行(虽然可能不是最优雅的解决方案)。

var explodeDF2 = explodeDF.withColumn("id", explodeDF("department.id"))
explodeDF2 = explodeDF2.withColumn("name", explodeDF2("department.name"))

3
您可以这样写:val explodeDF2 = explodeDF.withColumn("id", explodeDF("department.id")).withColumn("name", explodeDF("department.name")),这段代码的作用是将"department"列中的"id"和"name"字段拆分成两列。 - Davos

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接