使用Scala编写的UDF来按键过滤Map

3

我有一个Spark DataFrame,其模式如下:

root
 |-- mapkey: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- bt: string (nullable = true)
 |    |    |    |-- bp: double (nullable = true)
 |    |    |    |-- z: struct (nullable = true)
 |    |    |    |    |-- w: integer (nullable = true)
 |    |    |    |    |-- h: integer (nullable = true)
 |-- uid: string (nullable = true)

我想编写一个UDF来过滤地图键,使键等于uid,并且仅返回通过过滤器的值。我正在尝试以下内容:

val filterMap = udf((m: Map[String, Seq[Row]], uid: String) => {
    val s = Set(uid)
    m.filterKeys { s.contains(_) == true }
})

但是我遇到了以下错误:

java.lang.UnsupportedOperationException: 不支持类型为 org.apache.spark.sql.Row 的模式。在 ScalaReflection.scala 的第 762 行应用了 org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1 方法,在 ScalaReflection.scala 的第 704 行应用了该方法。在 TypeConstraints.scala 的第 56 行撤销了 scala.reflect.internal.tpe.TypeConstraints$UndoLog 操作。ScalaReflection.scala 的第 809 行清理了反射对象。ScalaReflection$.schemaFor 在 ScalaReflection.scala 的第 703 行实现。在 functions.scala 的第 3200 行使用了 functions$.udf。
有人能指出UDF的问题在哪里吗?
1个回答

1

看起来你唯一的选择是使用与此 Row 的内部结构匹配的 case class:

case class MyStruct(w: Int, h: Int)
case class Element(id: String, bt: String, bp: Double, z: MyStruct)

你可以在自定义函数中使用它(令人惊讶的是):
// sample data:
val df = Seq(
  (Map(
    "key1" -> Array(Element("1", "bt1", 0.1, MyStruct(1, 2)), Element("11", "bt11", 0.2, MyStruct(1, 3))),
    "key2" -> Array(Element("2", "bt2", 0.2, MyStruct(12, 22)))
  ), "key2")
).toDF("mapkey", "uid")

df.printSchema() // prints the right schema, as expected in post

// define UDF:
val filterMap = udf((m: Map[String, Seq[Element]], uid: String) => {
  m.filterKeys(_ == uid)
})

// use UDF:
df.withColumn("result", filterMap($"mapkey", $"uid")).show(false)

// prints:
// +-----------------------------------------------------------------+
// |result                                                           |
// +-----------------------------------------------------------------+
// |Map(key1 -> WrappedArray([1,bt1,0.1,[1,2]], [11,bt11,0.2,[1,3]]))|
// +-----------------------------------------------------------------+

我一直无法让这个方法正常工作。我总是会得到一个“Spark GenericRowWithSchema 无法转换为 XXX”错误。我发现将复杂类型传递给 UDF 的唯一方法是将其作为 Row 传递。 - Ted
有趣 - 也许Spark处理方式有所改变 - 这在我使用的Spark 2.3.0上有效,我想知道你使用的是哪个版本。如果您使用您的版本能够运行上述示例,那么将会很有趣。 - Tzach Zohar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接