在Spark Streaming中使用updateStateByKey()从原始事件流生成状态更改流。

5

我刚开始寻找Spark Streaming的有状态计算解决方案,发现了updateStateByKey()函数。

我想要解决的问题是: 10,000个传感器每分钟产生一个二进制值。

如果一个传感器连续报告的值与上一个不同,我希望能够标记并将其作为状态更改事件发送到Kafka。

我假设在这个例子中可以使用updateStateByKey()函数,但我不完全清楚实现相同功能的推荐方法。

2个回答

3

我假设你将从传感器获取一个(String, Int)的流,其中String是传感器的ID,Int是传感器返回的二进制值。在这个假设下,你可以尝试以下代码:

val sensorData: DStream[(String, Int)] = ???

val state = sensorData.updateStateByKey[(String, Int)](updateFunction _)

def updateFunction(newValues: Seq[(String, Int)], currentValues: Seq[(String, Int)]) = {
    val newValuesMap = newValues.toMap
    val currentValuesMap = currentValues.toMap

    currentValuesMap.keys.foreach ( (id) =>
            if(currrentValuesMap.get(id) != newValuesMap.getOrElse(id, -1)) {
                //send to Kafka
            }
    )       
    Some(newValues)
}

1
你关于 (String, Int) 对的想法是正确的,我会尝试你的代码并看看它的效果如何。谢谢。 - Arun Jose
嗨,Patrick,你能用.mapWithState()做同样的过程吗? - vdep

1

州的生命周期/资源是否可以进行管理?或者它总是在增长?

例如,在此会话示例中,状态将永远增长,对吗?是否有任何方法可以管理它,以便您可以清除/存档仅聚合3个月的数据或其他内容?


你可以返回任何你想要的新状态,所以你可以自由地进行管理。在我上面的答案示例中,每次只存储当前状态,因此它不会增长。最好是将其作为一个新问题提出,而不是作为原始帖子的答案。 - Patrick McGloin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接