如何在Spark/Scala中对数据框(Dataframe)的一列进行求和?

48

我有一个数据框(Dataframe),是从CSV文件中读取的,有很多列,例如:时间戳(timestamp)、步数(steps)、心率(heartrate)等。

我想要对每一列的值进行求和,比如说在“steps”列上求步数的总和。

据我所见,我需要使用这些函数: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$

但是我不知道如何使用sum函数。

当我写下以下内容时:

val df = CSV.load(args(0))
val sumSteps = df.sum("steps") 

函数sum无法解析。

我是否错误地使用了函数sum? 我需要先使用函数map吗? 如果是,应该怎么做?

一个简单的例子会非常有帮助! 我最近开始学习Scala。

5个回答

114

您必须先导入相关函数:

import org.apache.spark.sql.functions._

然后您可以像这样使用它们:

val df = CSV.load(args(0))
val sumSteps =  df.agg(sum("steps")).first.get(0)

如果需要,您也可以将结果转换为其他类型:

val sumSteps: Long = df.agg(sum("steps").cast("long")).first.getLong(0)

编辑:

对于多列(如“col1”,“col2”等),您可以一次获取所有聚合结果:

val sums = df.agg(sum("col1").as("sum_col1"), sum("col2").as("sum_col2"), ...).first

编辑2:

为了动态应用聚合,以下选项可用:

  • 一次应用于所有数字列:
df.groupBy().sum()
  • 应用于一系列数值列的名称:
val columnNames = List("col1", "col2")
df.groupBy().sum(columnNames: _*)
  • 应用别名和/或强制转换到数字列名列表:
val cols = List("col1", "col2")
val sums = cols.map(colName => sum(colName).cast("double").as("sum_" + colName))
df.groupBy().agg(sums.head, sums.tail:_*).show()

26

如果您想要对一列的所有值进行求和,最高效的方法是使用DataFrame内部的RDDreduce

import sqlContext.implicits._
import org.apache.spark.sql.functions._

val df = sc.parallelize(Array(10,2,3,4)).toDF("steps")
df.select(col("steps")).rdd.map(_(0).asInstanceOf[Int]).reduce(_+_)

//res1 Int = 19

7
不错的选择!如果他想要多列的总和,这仍然更有效吗?在DataFrame中,可以像df.agg(sum("col1"), sum("col2"), ...)这样做。 - Daniel de Paula
2
哦,我读到“我想要对每一列的值求和(...)”时,我以为他是指很多列。无论如何,我的问题更多是出于好奇心,以帮助改进我们的答案。 - Daniel de Paula
1
@DanieldePaula,确实您的答案是正确的,我的只是一个替代方案(仅适用于一个列),所以我会投票支持您的答案。 - Alberto Bonsanto
我喜欢你的替代方案!如果我可以再问一遍,你知道它在同时处理多列时是否仍然有效吗? - Daniel de Paula
1
我将第二个回答设为正确答案,因为我想要一列值的总和。然而后来我需要均值和其他统计方法,所以我认为我会使用类似于第一个答案的语法。 - Ectoras
显示剩余2条评论

9

4

不确定在这个问题被问及时是否已经存在,但是:

df.describe().show("columnName")

在一列上给出平均值、计数和标准偏差统计数据。如果只使用.show(),我认为它会返回所有列的统计数据。


0

使用Spark SQL查询..以防有人需要!

import org.apache.spark.sql.SparkSession 
import org.apache.spark.SparkConf 
import org.apache.spark.sql.functions._ 
import org.apache.spark.SparkContext 
import java.util.stream.Collectors

val conf = new SparkConf().setMaster("local[2]").setAppName("test")
val spark = SparkSession.builder.config(conf).getOrCreate()
val df = spark.sparkContext.parallelize(Seq(1, 2, 3, 4, 5, 6, 7)).toDF()

df.createOrReplaceTempView("steps")
val sum = spark.sql("select  sum(steps) as stepsSum from steps").map(row => row.getAs("stepsSum").asInstanceOf[Long]).collect()(0)
println("steps sum = " + sum) //prints 28

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接