在Scala中,什么数据结构类似于Python的嵌套字典或CSV文件?

4
我在Spark shell中使用Scala。我已将数据缩减为RDD,byHour: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[47] at reduceByKey at <console>:16 或者如果收集到一个数组,byHour: Array[(String, Int)],看起来像这样:
Array((6497+2006-03-19 20:00,13), (7511+2006-03-17 02:00,1), (13508+2006-03-26 10:00,4), (217+2006-05-16 16:00,1), (12404+2006-03-27 15:00,1), (9777+2006-05-14 09:00,1), (10291+2006-03-03 17:00,2), (4781+2006-05-10 14:00,2), (10291+2006-04-26 17:00,1), (15198+2006-04-26 12:00,1))

我希望将这个内容存储类似于Python中的嵌套字典或CSV文件。

在Python中,我会创建

{"6497": {"2006-03-19 20:00": 13, "2006-03-19 22:00": 1}, "7511": {"2006-03-17 02:00": 1}...}

最终我想要的是:
userid, 2006-03-17 01:00, 2006-03-17 02:00, ... , 2006-03-19 20:00, 2006-03-19 21:00, 2006-03-19 22:00
6497,0,0, ..., 13,0,1
7511,0,1, ..., 0,0,0

我不确定如何在Scala中实现这个。我认为我需要一个哈希映射的列表或集合,或者是一个hashMap[String => hashMap]。


更新: byHour是一个RDD[(String, Int)]。

val byUserHour = byHour.map(x => (x._1.split("\\+")(0),(x._1.split("\\+")(1),x._2)))
val byUser = byUserHour.groupByKey
val times = byHour.map(x => x._1.split("\\+")(1)).distinct.collect.sortWith(_ < _)
val broadcastTimes = sc.broadcast(times)
val userMaps = byUser.mapValues { 
  x => x.map{
    case(time,cnt) => time -> cnt
  }.toMap
}
val result = userMaps.map {
  case(u,ut) => (u +: broadcastTimes.value.map(ut.getOrElse(_,0).toString))}
val lines = result.map(_.mkString(","))
val header = List("userid") ::: times.toList

首选分布式方法。 - Climbs_lika_Spyder
你的模型中会有一个已知的固定行数吗?(类似于CSV文件?)还是这将取决于数据本身? - maasg
我知道我有多少个用户ID,但有时候我不会知道。 - Climbs_lika_Spyder
抱歉,我想问的是列。由于您正在要求数据结构,因此案例类可能会有所帮助,但对于太多元素(在您的情况下是时间列),它可能变得笨重。 - maasg
我可以扫描数据以查找所有列名,但它们会非常多(每小时几个月)。 - Climbs_lika_Spyder
显示剩余2条评论
1个回答

2

首先,您需要拆分用户ID,这样您就可以得到一个data: Seq[(String, String, Int)]。然后按用户ID进行分组:

val byUser: Map[String, Seq[(String, String, Int)]] = data.groupBy(_._1)

现在我们可以为每个用户创建一个地图:
val userMaps: Map[String, Map[String, Int]] = byUser.mapValues {
  s => s.map {
    case (user, time, n) => time -> n
  }.toMap
}

对于最终的格式化,您首先需要获取不同的时间戳,然后在每个用户映射中查找这些时间戳:

val times: Seq[String] = data.map(_._2).toSet.toList
val result: Seq[Seq[String]] = userMaps.toSeq.map {
  case (u, ut) => (u +: times.map(ut.getOrElse(_, 0).toString))
}
val lines: Seq[String] = result.map(_.mkString(","))

希望这些足以让您入门。您可以在http://twitter.github.io/scala_school/collections.html(以及其他许多地方)阅读更多关于Scala集合的内容。
以上所有内容都是本地计算,没有分布式。要以分布式方式执行相同的操作,您需要在开始时将数据读入RDD(sc.textFile()),并执行大致相同的操作序列。
一个小区别是,您需要使用groupByKey而不是groupBy,它的行为有所不同。从RDD[A, B]中获取RDD[A, Iterable[B]],而不是Map[A, Seq[(A, B)]]
一个重要的区别是,您需要从群集中收集times到应用程序,然后广播到所有节点:
val times: Seq[String] = data.map(_._2).distinct.collect
val broadcast = sc.broadcast(times)
val result: RDD[Seq[String]] = userMaps.map {
  val times = broadcast.value
  case (u, ut) => (u +: times.map(ut.getOrElse(_, 0).toString))
}

我不知道这有多分布式,但它能够工作。谢谢。 - Climbs_lika_Spyder
你说得对,它根本不是分布式的。(我应该注意到这一点。) 我现在已经添加了一个分布式概述,希望它有意义... 如果您有更多问题,请查看http://spark.apache.org/docs/latest/programming-guide.html!(或在这里问。) - Daniel Darabos
感谢您对分布式系统的概述。我已经找到了答案。但是,我不确定我的系统是否足够分布式,因为它可以处理小数据集,但在大数据集(2G)上无法返回结果。 - Climbs_lika_Spyder
是的,这种情况经常发生 :). 可能有几个原因。使用此算法的风险在于不同时间戳的数量可能太大,导致输出中的每一行都变得非常庞大。祝你调试好运! - Daniel Darabos

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接