使用Scala在Spark DataFrame上进行聚合操作,实现多个动态聚合操作(由用户在JSON中传递)。将JSON转换为Map
。
以下是一些示例数据:
colA colB colC colD
1 2 3 4
5 6 7 8
9 10 11 12
我使用的Spark聚合代码如下:
var cols = ["colA","colB"]
var aggFuncMap = Map("colC"-> "sum", "colD"-> "countDistinct")
var aggregatedDF = currentDF.groupBy(cols.head, cols.tail: _*).agg(aggFuncMap)
我必须将
aggFuncMap
作为Map
传递,以便用户可以通过JSON配置传递任意数量的聚合函数。上述代码对于一些聚合函数(包括sum
、min
、max
、avg
和count
)已经能很好地工作。但是,不幸的是,该代码不能处理countDistinct
(可能因为它是驼峰式命名?)。运行以上代码会导致以下错误:感谢您的帮助!Exception in thread "main" org.apache.spark.sql.AnalysisException: Undefined function: 'countdistinct'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'