我有一个KTable,其中包含以下形式的数据(key => value),其中键是客户ID,值是包含一些客户数据的小型JSON对象:
1 => { "name" : "John", "age_group": "25-30"}
2 => { "name" : "Alice", "age_group": "18-24"}
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
我想在这个KTable上进行一些聚合,基本上对每个age_group
的记录数进行计数。期望的KTable数据如下:
"18-24" => 3
"25-30" => 1
假设一个名叫Alice
的人,她的年龄在18-24
岁年龄段内,生日过后将进入新的年龄段。支持第一个KTable的状态存储现在应该看起来像这样:
1 => { "name" : "John", "age_group": "25-30"}
2 => { "name" : "Alice", "age_group": "25-30"} # Happy Cake Day
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
我希望最终聚合的KTable结果能够反映这一点。例如:
"18-24" => 2
"25-30" => 2
我可能过于笼统地描述了这里所述的问题:
Kafka Streams中没有所谓的最终聚合...根据您的用例,手动去重是解决该问题的一种方法"
但到目前为止,我只能计算一个运行总数,例如,Alice的生日会被解释为:
"18-24" => 3 # Old Alice record still gets counted here
"25-30" => 2 # New Alice record gets counted here as well
编辑: 这里还有一些我注意到的似乎意外的行为。
我使用的拓扑结构看起来像:
dataKTable = builder.table("compacted-topic-1", "users-json")
.groupBy((key, value) -> KeyValue.pair(getAgeRange(value), key))
.count("age-range-counts")
1) 空状态
现在,从初始的空状态开始,所有内容都是这样的:
compacted-topic-1
(empty)
dataKTable
(empty)
// groupBy()
Repartition topic: $APP_ID-age-range-counts-repartition
(empty)
// count()
age-range-counts state store
(empty)
2) 发送几条消息
现在,让我们向作为上面KTable流的compacted-topic-1
发送一条消息。这是发生的事情:
compacted-topic-1
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
dataKTable
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }
// groupBy()
// why does this generate 4 events???
Repartition topic: $APP_ID-age-range-counts-repartition
18-24 => 3
18-24 => 3
18-24 => 4
18-24 => 4
// count()
age-range-counts state store
18-24 => 0
我在想:
我是否可以使用Kafka Streams 0.10.1或0.10.2来实现我正在尝试做的事情? 我已经尝试在DSL中使用groupBy和count,但也许我需要使用reduce之类的东西吗?
此外,我有一些困难理解导致调用add减法器和subtract减法器的情况,因此任何关于这些点的澄清将不胜感激。