Spark多个动态聚合函数,countDistinct无法工作。

4

使用Scala在Spark DataFrame上进行聚合操作,实现多个动态聚合操作(由用户在JSON中传递)。将JSON转换为Map

以下是一些示例数据:

colA    colB    colC    colD
1       2       3       4
5       6       7       8
9       10      11      12

我使用的Spark聚合代码如下:

var cols = ["colA","colB"]
var aggFuncMap = Map("colC"-> "sum", "colD"-> "countDistinct")
var aggregatedDF = currentDF.groupBy(cols.head, cols.tail: _*).agg(aggFuncMap)

我必须将aggFuncMap作为Map传递,以便用户可以通过JSON配置传递任意数量的聚合函数。上述代码对于一些聚合函数(包括summinmaxavgcount)已经能很好地工作。但是,不幸的是,该代码不能处理countDistinct(可能因为它是驼峰式命名?)。运行以上代码会导致以下错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Undefined function: 'countdistinct'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'

感谢您的帮助!
1个回答

8

目前不可能在 Map 内使用带有 countDistinctagg。从 文档 中我们看到:

可用的聚合方法是 avg、max、min、sum 和 count。


一个可能的解决办法是将 Map 更改为 Seq[Column]

val cols = Seq("colA", "colB")
val aggFuncs = Seq(sum("colC"), countDistinct("colD"))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)

但如果用户需要在配置文件中指定聚合操作,那么这并没有太大帮助。

另一种方法是使用expr函数,该函数将评估一个字符串并返回一个列。然而,expr不接受"countDistinct",必须使用"count(distinct(...))"。 可以编写以下代码:

val aggFuncs = Seq("sum(colC)", "count(distinct(colD))").map(e => expr(e))
val df2 = df.groupBy(cols.head, cols.tail: _*).agg(aggFuncs.head, aggFuncs.tail: _*)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接