使用Spark Scala计算平均值

3
如何使用Spark Scala计算以下两个数据集中每个位置的平均薪资?
File1.csv(第4列为薪资)
Ram, 30, Engineer, 40000  
Bala, 27, Doctor, 30000  
Hari, 33, Engineer, 50000  
Siva, 35, Doctor, 60000

File2.csv(第2列为位置)

Hari, Bangalore  
Ram, Chennai  
Bala, Bangalore  
Siva, Chennai  

上述文件未排序。需要合并这两个文件并查找每个地点的平均工资。我尝试使用以下代码,但无法成功。
val salary = sc.textFile("File1.csv").map(e => e.split(","))  
val location = sc.textFile("File2.csv").map(e.split(","))  
val joined = salary.map(e=>(e(0),e(3))).join(location.map(e=>(e(0),e(1)))  
val joinedData = joined.sortByKey()  
val finalData = joinedData.map(v => (v._1,v._2._1._1,v._2._2))  
val aggregatedDF = finalData.map(e=> e.groupby(e(2)).agg(avg(e(1))))    
aggregatedDF.repartition(1).saveAsTextFile("output.txt")  

请提供代码和样本输出,以便查看。
非常感谢。
4个回答

5

您可以将CSV文件读入DataFrames中,然后进行连接和分组以获得平均值:

val df1 = spark.read.csv("/path/to/file1.csv").toDF(
  "name", "age", "title", "salary"
)

val df2 = spark.read.csv("/path/to/file2.csv").toDF(
  "name", "location"
)

import org.apache.spark.sql.functions._

val dfAverage = df1.join(df2, Seq("name")).
  groupBy(df2("location")).agg(avg(df1("salary")).as("average")).
  select("location", "average")

dfAverage.show
+-----------+-------+
|   location|average|
+-----------+-------+
|Bangalore  |40000.0|
|  Chennai  |50000.0|
+-----------+-------+

[更新] 计算平均尺寸的方法:
// file1.csv:
Ram,30,Engineer,40000,600*200
Bala,27,Doctor,30000,800*400
Hari,33,Engineer,50000,700*300
Siva,35,Doctor,60000,600*200

// file2.csv
Hari,Bangalore
Ram,Chennai
Bala,Bangalore
Siva,Chennai

val df1 = spark.read.csv("/path/to/file1.csv").toDF(
  "name", "age", "title", "salary", "dimensions"
)

val df2 = spark.read.csv("/path/to/file2.csv").toDF(
  "name", "location"
)

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType

val dfAverage = df1.join(df2, Seq("name")).
  groupBy(df2("location")).
  agg(
    avg(split(df1("dimensions"), ("\\*")).getItem(0).cast(IntegerType)).as("avg_length"),
    avg(split(df1("dimensions"), ("\\*")).getItem(1).cast(IntegerType)).as("avg_width")
  ).
  select(
    $"location", $"avg_length", $"avg_width",
    concat($"avg_length", lit("*"), $"avg_width").as("avg_dimensions")
  )

dfAverage.show
+---------+----------+---------+--------------+
| location|avg_length|avg_width|avg_dimensions|
+---------+----------+---------+--------------+
|Bangalore|     750.0|    350.0|   750.0*350.0|
|  Chennai|     600.0|    200.0|   600.0*200.0|
+---------+----------+---------+--------------+

谢谢回复。假设列中不是薪水,而是像600200(长度宽度)这样的尺寸,我该如何计算平均值?例如:Ram 600200,Hari 700300等等... - akrockz
@akrockz,请查看扩展答案。 - Leo C
非常感谢@Leo C.。这正是我在寻找的东西。最后一个请求...目前我的笔记本电脑上没有设置Spark..如果我将输入数据发送给您,您是否可以将输出发送给我? 很抱歉问了这么多问题..谢谢。 - akrockz
@akrockz,根据我的系统使用政策,运行代码或使用来自未知来源的数据是不允许的。很抱歉无法帮助你解决这个问题。 - Leo C
如果最终我想要写入一个CSV文件,我可以使用以下命令吗? dfAverage.repartition(1).write.csv("output.csv") 这个会起作用吗? - akrockz
可以这样做。或者你可以用coalesce(1)替换repartition(1),以避免repartition()强制执行的数据洗牌。 - Leo C

3
我会使用DataFrame API,这应该可以解决问题:
val salary = sc.textFile("File1.csv")
               .map(e => e.split(","))
               .map{case Seq(name,_,_,salary) => (name,salary)}
               .toDF("name","salary")

val location = sc.textFile("File2.csv")
                 .map(e => e.split(","))
                 .map{case Seq(name,location) => (name,location)}
                 .toDF("name","location")

import org.apache.spark.sql.functions._

salary
  .join(location,Seq("name"))
  .groupBy($"location")
  .agg(
    avg($"salary").as("avg_salary")
  )
  .repartition(1)
  .write.csv("output.csv")

那么最终的输出看起来像下面这样?+------------------------+ | 地点 | 平均薪资 | +------------------------+ | 班加罗尔 | 40000 | | 金奈 | 500000 | +------------------------+ - akrockz
还有一个疑问...假设列中不是薪水,而是像600200(长度宽度)这样的尺寸,我该如何计算平均值?Ram 600200 Hari 700300等等... - akrockz

2
我会使用数据框架: 首先读取数据框架,例如:
val salary = spark.read.option("header", "true").csv("File1.csv")
val location = spark.read.option("header", "true").csv("File2.csv")

如果您没有标题,则需要将选项设置为“false”,并使用withColumnRenamed更改默认名称。

val salary = spark.read.option("header", "false").csv("File1.csv").toDF("name", "age", "job", "salary")
val location = spark.read.option("header", "false").csv("File2.csv").toDF("name", "location")

现在进行连接操作:
val joined = salary.join(location, "name")

最后进行平均数计算:
val avg = joined.groupby("location").agg(avg($"salary"))

保存操作:

avg.repartition(1).write.csv("output.csv")

谢谢回复。假设列中不是薪水,而是像 600200(长度 * 宽度)这样的尺寸,那么我该如何计算平均值呢?例如:Ram 600200、Hari 700*300 等等... - akrockz
你的意思是什么?你是指每个名称的多个出现次数,每个名称都有多列吗? - Assaf Mendelson

0
你可以像这样做:
val salary = sc.textFile("File1.csv").map(_.split(",").map(_.trim))
val location = sc.textFile("File2.csv").map(_.split(",").map(_.trim))
val joined = salary.map(e=>(e(0),e(3).toInt)).join(location.map(e=>(e(0),e(1))))
val locSalary = joined.map(v => (v._2._2, v._2._1))
val averages = locSalary.aggregateByKey((0,0))((t,e) => (t._1 + 1, t._2 + e),
        (t1,t2) => (t1._1 + t2._1, t1._2 + t2._2)).mapValues(t => t._2/t._1)

那么averages.take(10)将会返回:

res5: Array[(String, Int)] = Array((Chennai,50000), (Bangalore,40000))

谢谢您的回复。假设列中没有薪水,而是像600200(长度宽度)这样的尺寸,请问我该如何计算平均值呢? Ram 600200 Hari 700300 等等... - akrockz
这些维度是以字符串形式给出的吗?您想要计算面积(长度乘以宽度)的平均值,还是每个维度都有一个平均值? - Harald Gliebe
我想要每个维度的平均值,按位置分组。 - akrockz

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接