我有一个具有模式的数据框架。
例如,我有数据。
我想过滤具有特定前缀(例如“ft”)的功能。最终,我希望得到以下结果:
但如果我应用这个UDF函数。
我遇到了错误
但我得到了:
我尝试过:
在这种情况下,我该做什么?
root
|-- x: Long (nullable = false)
|-- y: Long (nullable = false)
|-- features: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- score: double (nullable = true)
例如,我有数据。
+--------------------+--------------------+------------------------------------------+
| x | y | features |
+--------------------+--------------------+------------------------------------------+
|10 | 9 |[["f1", 5.9], ["ft2", 6.0], ["ft3", 10.9]]|
|11 | 0 |[["f4", 0.9], ["ft1", 4.0], ["ft2", 0.9] ]|
|20 | 9 |[["f5", 5.9], ["ft2", 6.4], ["ft3", 1.9] ]|
|18 | 8 |[["f1", 5.9], ["ft4", 8.1], ["ft2", 18.9]]|
+--------------------+--------------------+------------------------------------------+
我想过滤具有特定前缀(例如“ft”)的功能。最终,我希望得到以下结果:
+--------------------+--------------------+-----------------------------+
| x | y | features |
+--------------------+--------------------+-----------------------------+
|10 | 9 |[["ft2", 6.0], ["ft3", 10.9]]|
|11 | 0 |[["ft1", 4.0], ["ft2", 0.9] ]|
|20 | 9 |[["ft2", 6.4], ["ft3", 1.9] ]|
|18 | 8 |[["ft4", 8.1], ["ft2", 18.9]]|
+--------------------+--------------------+-----------------------------+
我没有使用Spark 2.4+,所以无法使用这里提供的解决方案:Spark(Scala)过滤不需扩展的结构数组
我尝试使用UDF,但仍然无法生效。以下是我的尝试。定义了一个UDF:
def filterFeature: UserDefinedFunction =
udf((features: Seq[Row]) =>
features.filter{
x.getString(0).startsWith("ft")
}
)
但如果我应用这个UDF函数。
df.withColumn("filtered", filterFeature($"features"))
我遇到了错误
Schema for type org.apache.spark.sql.Row is not supported
。我发现无法从UDF中返回Row
。然后我尝试了:def filterFeature: UserDefinedFunction =
udf((features: Seq[Row]) =>
features.filter{
x.getString(0).startsWith("ft")
}, (StringType, DoubleType)
)
然后我遇到了一个错误:
error: type mismatch;
found : (org.apache.spark.sql.types.StringType.type, org.apache.spark.sql.types.DoubleType.type)
required: org.apache.spark.sql.types.DataType
}, (StringType, DoubleType)
^
根据一些答案的建议,我也尝试了一个case class:
case class FilteredFeature(featureName: String, featureScore: Double)
def filterFeature: UserDefinedFunction =
udf((features: Seq[Row]) =>
features.filter{
x.getString(0).startsWith("ft")
}, FilteredFeature
)
但我得到了:
error: type mismatch;
found : FilteredFeature.type
required: org.apache.spark.sql.types.DataType
}, FilteredFeature
^
我尝试过:
case class FilteredFeature(featureName: String, featureScore: Double)
def filterFeature: UserDefinedFunction =
udf((features: Seq[Row]) =>
features.filter{
x.getString(0).startsWith("ft")
}, Seq[FilteredFeature]
)
我得到:
<console>:192: error: missing argument list for method apply in class GenericCompanion
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `apply _` or `apply(_)` instead of `apply`.
}, Seq[FilteredFeature]
^
我尝试过:
case class FilteredFeature(featureName: String, featureScore: Double)
def filterFeature: UserDefinedFunction =
udf((features: Seq[Row]) =>
features.filter{
x.getString(0).startsWith("ft")
}, Seq[FilteredFeature](_)
)
I got:
<console>:201: error: type mismatch;
found : Seq[FilteredFeature]
required: FilteredFeature
}, Seq[FilteredFeature](_)
^
在这种情况下,我该做什么?