Spark Streaming中groupByKey和updateStateByKey的实现

7
我正在尝试使用Spark Streaming对从Kafka读取的(假)Apache Web服务器日志运行有状态计算。目标是类似于此博客文章将Web流量“会话化”。唯一的区别在于,我想对IP访问的每个页面进行“会话化”,而不是整个会话。我能够使用Spark批处理模式从虚假Web流量文件中读取并完成此操作,但现在我想在流媒体环境中完成它。
日志文件从Kafka中读取并解析为K / V对,其中包括(String,(String,Long,Long))(IP,(requestPage,time,time))
然后我在这个K / V对上调用groupByKey()。在批处理模式下,这将产生一个:(String,CollectionBuffer((String,Long,Long),...)(IP,CollectionBuffer((requestPage,time,time),...)
在StreamingContext中,它会产生一个:(String,ArrayBuffer((String,Long,Long),...),如下所示:
(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))

然而,随着下一个微批次(DStream)的到来,这些信息被丢弃。

最终我想要的是让 ArrayBuffer 随着给定 IP 的持续交互而填充,并对其数据运行一些计算以“会话化”页面时间。

我相信实现这一目标的操作符是 "updateStateByKey"。但是,我在使用这个操作符时遇到了一些问题(我对 Spark 和 Scala 都很新手);

非常感谢您的帮助。

到目前为止:

val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey) 


    def updateGroupByKey(
                          a: Seq[(String, ArrayBuffer[(String, Long, Long)])],
                          b: Option[(String, ArrayBuffer[(String, Long, Long)])]
                          ): Option[(String, ArrayBuffer[(String, Long, Long)])] = {

  }
2个回答

2
我认为你正在寻找类似于这样的东西:
 def updateGroupByKey(
                          newValues: Seq[(String, ArrayBuffer[(String, Long, Long)])],
                          currentValue: Option[(String, ArrayBuffer[(String, Long, Long)])]
                          ): Option[(String, ArrayBuffer[(String, Long, Long)])] = {
     //Collect the values
     val buffs: Seq[ArrayBuffer[(String, Long, Long)]] = (for (v <- newValues) yield v._2)
     val buffs2 = if (currentValue.isEmpty) buffs else currentValue.get._2 :: buffs
     //Convert state to buffer
     if (buffs2.isEmpty) None else {
        val key = if (currentValue.isEmpty) newValues(0)._1 else currentValue.get._1
        Some((key, buffs2.foldLeft(new ArrayBuffer[(String, Long, Long)])((v, a) => v++a)))
     }
  }

2
Gabor的回答让我朝着正确的方向开始了,但这里有一个能产生期望输出的答案。首先,我想要的输出是:
(100.40.49.235,List((/,1418934075000,1418934075000), (/,1418934105000,1418934105000), (/contactus.html,1418934174000,1418934174000)))

我不需要 groupByKey()updateStateByKey 已经将值累加到序列中,因此添加 groupByKey 是不必要的(而且昂贵)。Spark 用户强烈建议不要使用 groupByKey
以下是已经有效的代码:
def updateValues( newValues: Seq[(String, Long, Long)],
                      currentValue: Option[Seq[ (String, Long, Long)]]
                      ): Option[Seq[(String, Long, Long)]] = {

  Some(currentValue.getOrElse(Seq.empty) ++ newValues)

  }


val grouped = ipTimeStamp.updateStateByKey(updateValues)

在这里,updateStateByKey被传递了一个函数(updateValues),该函数随着时间的推移累积值(newValues),以及流中当前值(currentValue)的选项。然后返回它们的组合。由于currentValue偶尔可能为空,因此需要使用getOrElse。感谢https://twitter.com/granturing提供正确的代码。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接