我正在尝试使用Spark Streaming对从Kafka读取的(假)Apache Web服务器日志运行有状态计算。目标是类似于此博客文章将Web流量“会话化”。唯一的区别在于,我想对IP访问的每个页面进行“会话化”,而不是整个会话。我能够使用Spark批处理模式从虚假Web流量文件中读取并完成此操作,但现在我想在流媒体环境中完成它。
日志文件从Kafka中读取并解析为
然后我在这个
在StreamingContext中,它会产生一个:
日志文件从Kafka中读取并解析为
K / V
对,其中包括(String,(String,Long,Long))
或(IP,(requestPage,time,time))
。然后我在这个
K / V
对上调用groupByKey()
。在批处理模式下,这将产生一个:(String,CollectionBuffer((String,Long,Long),...)
或(IP,CollectionBuffer((requestPage,time,time),...)
。在StreamingContext中,它会产生一个:
(String,ArrayBuffer((String,Long,Long),...)
,如下所示:(183.196.254.131,ArrayBuffer((/test.php,1418849762000,1418849762000)))
然而,随着下一个微批次(DStream)的到来,这些信息被丢弃。
最终我想要的是让 ArrayBuffer
随着给定 IP 的持续交互而填充,并对其数据运行一些计算以“会话化”页面时间。
我相信实现这一目标的操作符是 "updateStateByKey
"。但是,我在使用这个操作符时遇到了一些问题(我对 Spark 和 Scala 都很新手);
非常感谢您的帮助。
到目前为止:
val grouped = ipTimeStamp.groupByKey().updateStateByKey(updateGroupByKey)
def updateGroupByKey(
a: Seq[(String, ArrayBuffer[(String, Long, Long)])],
b: Option[(String, ArrayBuffer[(String, Long, Long)])]
): Option[(String, ArrayBuffer[(String, Long, Long)])] = {
}