我正在学习 Databricks 的一个示例。数据框的模式如下:
> parquetDF.printSchema
root
|-- department: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- name: string (nullable = true)
|-- employees: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- firstName: string (nullable = true)
| | |-- lastName: string (nullable = true)
| | |-- email: string (nullable = true)
| | |-- salary: integer (nullable = true)
在这个例子中,他们展示了如何将员工列拆分成4个附加列。val explodeDF = parquetDF.explode($"employees") {
case Row(employee: Seq[Row]) => employee.map{ employee =>
val firstName = employee(0).asInstanceOf[String]
val lastName = employee(1).asInstanceOf[String]
val email = employee(2).asInstanceOf[String]
val salary = employee(3).asInstanceOf[Int]
Employee(firstName, lastName, email, salary)
}
}.cache()
display(explodeDF)
如何使用部门列做类似的操作(即向数据框添加名为"id"和"name"的两列)? 方法并不完全相同,我只能想出如何创建一个全新的数据帧:
val explodeDF = parquetDF.select("department.id","department.name")
display(explodeDF)
如果我尝试:
val explodeDF = parquetDF.explode($"department") {
case Row(dept: Seq[String]) => dept.map{dept =>
val id = dept(0)
val name = dept(1)
}
}.cache()
display(explodeDF)
我收到了警告和错误:
<console>:38: warning: non-variable type argument String in type pattern Seq[String] is unchecked since it is eliminated by erasure
case Row(dept: Seq[String]) => dept.map{dept =>
^
<console>:37: error: inferred type arguments [Unit] do not conform to method explode's type parameter bounds [A <: Product]
val explodeDF = parquetDF.explode($"department") {
^
employees
也有一个元素,而department
没有。 - gsamarasemployees
示例创建了新行,而department
示例只应该创建两个新列。 - Feynman27department.id
->inner_id
,department.name
->inner_name
,... - Saddle Point