我有一个Spark DataFrame,其模式如下:
root
|-- mapkey: map (nullable = true)
| |-- key: string
| |-- value: array (valueContainsNull = true)
| | |-- element: struct (containsNull = true)
| | | |-- id: string (nullable = true)
| | | |-- bt: string (nullable = true)
| | | |-- bp: double (nullable = true)
| | | |-- z: struct (nullable = true)
| | | | |-- w: integer (nullable = true)
| | | | |-- h: integer (nullable = true)
|-- uid: string (nullable = true)
我想编写一个UDF来过滤地图键,使键等于uid,并且仅返回通过过滤器的值。我正在尝试以下内容:
val filterMap = udf((m: Map[String, Seq[Row]], uid: String) => {
val s = Set(uid)
m.filterKeys { s.contains(_) == true }
})
但是我遇到了以下错误: java.lang.UnsupportedOperationException: 不支持类型为 org.apache.spark.sql.Row 的模式。在 ScalaReflection.scala 的第 762 行应用了 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1 方法,在 ScalaReflection.scala 的第 704 行应用了该方法。在 TypeConstraints.scala 的第 56 行撤销了 scala.reflect.internal.tpe.TypeConstraints$UndoLog 操作。ScalaReflection.scala 的第 809 行清理了反射对象。ScalaReflection$.schemaFor 在 ScalaReflection.scala 的第 703 行实现。在 functions.scala 的第 3200 行使用了 functions$.udf。
有人能指出UDF的问题在哪里吗?