我在Spark shell中使用Scala。我已将数据缩减为RDD,
最终我想要的是:
byHour: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[47] at reduceByKey at <console>:16
或者如果收集到一个数组,byHour: Array[(String, Int)]
,看起来像这样:Array((6497+2006-03-19 20:00,13), (7511+2006-03-17 02:00,1), (13508+2006-03-26 10:00,4), (217+2006-05-16 16:00,1), (12404+2006-03-27 15:00,1), (9777+2006-05-14 09:00,1), (10291+2006-03-03 17:00,2), (4781+2006-05-10 14:00,2), (10291+2006-04-26 17:00,1), (15198+2006-04-26 12:00,1))
我希望将这个内容存储类似于Python中的嵌套字典或CSV文件。
在Python中,我会创建
{"6497": {"2006-03-19 20:00": 13, "2006-03-19 22:00": 1}, "7511": {"2006-03-17 02:00": 1}...}
最终我想要的是:
userid, 2006-03-17 01:00, 2006-03-17 02:00, ... , 2006-03-19 20:00, 2006-03-19 21:00, 2006-03-19 22:00
6497,0,0, ..., 13,0,1
7511,0,1, ..., 0,0,0
我不确定如何在Scala中实现这个。我认为我需要一个哈希映射的列表或集合,或者是一个hashMap[String => hashMap]。
更新: byHour是一个RDD[(String, Int)]。
val byUserHour = byHour.map(x => (x._1.split("\\+")(0),(x._1.split("\\+")(1),x._2)))
val byUser = byUserHour.groupByKey
val times = byHour.map(x => x._1.split("\\+")(1)).distinct.collect.sortWith(_ < _)
val broadcastTimes = sc.broadcast(times)
val userMaps = byUser.mapValues {
x => x.map{
case(time,cnt) => time -> cnt
}.toMap
}
val result = userMaps.map {
case(u,ut) => (u +: broadcastTimes.value.map(ut.getOrElse(_,0).toString))}
val lines = result.map(_.mkString(","))
val header = List("userid") ::: times.toList