如何在Spark中同时对多个列进行聚合

5

我有一个包含多列的数据框。我想按其中一列分组,并聚合其他所有列。比如说,表格有4列:cust_id、f1、f2、f3,我想按cust_id分组,然后获取f1、f2和f3的平均值。该表格可能有很多列。有什么提示吗?

以下代码是一个好的起点,但由于我有很多列,手动编写可能不是一个好主意。

df.groupBy("cust_id").agg(sum("f1"), sum("f2"), sum("f3"))

1
可能是SparkSQL:将聚合函数应用于列列表的重复问题。 - user6022341
1个回答

12

也许你可以尝试使用列名映射一个列表:

val groupCol = "cust_id"
val aggCols = (df.columns.toSet - groupCol).map(
  colName => avg(colName).as(colName + "_avg")
).toList

df.groupBy(groupCol).agg(aggCols.head, aggCols.tail: _*)

如果需要的话,您还可以根据类型匹配模式并构建聚合。

val aggCols = df.schema.collect {
  case StructField(colName, IntegerType, _, _) => avg(colName).as(colName + "_avg")
  case StructField(colName, StringType, _, _) => first(colName).as(colName + "_first")
}

我该如何将聚合列命名为类似于f1_avg的内容呢? - HHH
@H.Z. 只需要在后面加上 .as()。在第一个例子中:.map(colName => avg(colName).as(colName+"_avg")) 在第二个例子中,只需在函数后面加上 .as() - Daniel de Paula
太不可思议了!agg(aggCols:_*)可以,但是agg(aggCols.head,aggCols.tail:_*)却可以!纯魔法!您能否解释一下背后的原因?谢谢。 - J.J.
1
@JennyYueJin 如果您查看文档中可用的agg签名,没有agg(exprs: Column*)选项,只有头和尾选项。我不确定他们为什么选择了这种行为,但我认为这是为了避免使用空参数列表调用agg,例如df.groupBy("col_a").agg(),这在单个exprs: Column*参数中是可能的。 - Daniel de Paula

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接