在Apache Spark数据框中展开数组

5

我正在尝试展开一个带有嵌套字段的现有数据框架。 我的数据框架的结构如下:

root  
|-- Id: long (nullable = true)  
|-- Type: string (nullable = true)  
|-- Uri: string (nullable = true)    
|-- Type: array (nullable = true)  
|    |-- element: string (containsNull = true)  
|-- Gender: array (nullable = true)  
|    |-- element: string (containsNull = true)

类型和性别可以包含元素数组、一个元素或空值。

我尝试使用以下代码:

var resDf = df.withColumn("FlatType", explode(df("Type")))

但是,作为结果,我失去了Type列为空的行。这意味着,例如,如果我有10行数据,其中7行Type列为空,而3行不为空,则在使用explode函数后,结果数据框中只剩下3行。
我该如何保留Type列为空的行,同时展开值数组?
我找到了一种解决方法,但仍然卡在一个地方。对于标准类型,我们可以按照以下方式操作:
def customExplode(df: DataFrame, field: String, colType: String): org.apache.spark.sql.Column = {
var exploded = None: Option[org.apache.spark.sql.Column]
colType.toLowerCase() match {
  case "string" => 
    val avoidNull = udf((column: Seq[String]) =>
    if (column == null) Seq[String](null)
    else column)
    exploded = Some(explode(avoidNull(df(field))))
  case "boolean" => 
    val avoidNull = udf((xs: Seq[Boolean]) =>
    if (xs == null) Seq[Boolean]()
    else xs)
    exploded = Some(explode(avoidNull(df(field))))
  case _ => exploded = Some(explode(df(field)))
}
exploded.get

接下来只需像这样使用它:

val explodedField = customExplode(resultDf, fieldName, fieldTypeMap(field))
resultDf = resultDf.withColumn(newName, explodedField)

然而,我在以下结构类型中遇到了问题:

(涉及IT技术)


 |-- Address: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- AddressType: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true) 
 |    |    |-- DEA: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- Number: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |-- ExpirationDate: array (nullable = true)
 |    |    |    |    |    |-- element: timestamp (containsNull = true)
 |    |    |    |    |-- Status: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)

当 DEA 为空时,我们如何处理这种模式?

提前感谢你。

附言:我尝试使用 Lateral views,但结果仍然相同。

2个回答

4
也许你可以尝试使用when
val resDf = df.withColumn("FlatType", when(df("Type").isNotNull, explode(df("Type")))

根据 when 函数的 文档,对于不符合条件的值,会插入 null 值。

抱歉,但是当我尝试这个解决方案时,我遇到了以下异常:java.lang.UnsupportedOperationException。如果我用某个值替换explode(df("Type")),它就可以正常工作。我想这个函数不支持作为值的爆炸列。 - Artem
@Artem,你说得对,我很抱歉。union是你的一个选项吗? 你可以执行以下操作:df.where($"Type".isNull).withColumn("FlatType", lit(null)).unionAll(df.withColumn("FlatType", explode($"Type"))) - Daniel de Paula
是的,谢谢。我考虑过这个选项,但我正在构建一个通用算法来展平模式,我担心联合可能会非常慢。我希望能找到更好的解决方案,但联合对我来说是备选选项。 - Artem

1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接