Kafka Streams - 在KTable上更新聚合

10

我有一个KTable,其中包含以下形式的数据(key => value),其中键是客户ID,值是包含一些客户数据的小型JSON对象:

1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "18-24"}
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

我想在这个KTable上进行一些聚合,基本上对每个age_group的记录数进行计数。期望的KTable数据如下:

"18-24" => 3
"25-30" => 1

假设一个名叫Alice的人,她的年龄在18-24岁年龄段内,生日过后将进入新的年龄段。支持第一个KTable的状态存储现在应该看起来像这样:

1 => { "name" : "John", "age_group":  "25-30"}
2 => { "name" : "Alice", "age_group": "25-30"} # Happy Cake Day
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

我希望最终聚合的KTable结果能够反映这一点。例如:

"18-24" => 2
"25-30" => 2

我可能过于笼统地描述了这里所述的问题:

Kafka Streams中没有所谓的最终聚合...根据您的用例,手动去重是解决该问题的一种方法"

但到目前为止,我只能计算一个运行总数,例如,Alice的生日会被解释为:

"18-24" => 3 # Old Alice record still gets counted here
"25-30" => 2 # New Alice record gets counted here as well

编辑: 这里还有一些我注意到的似乎意外的行为。

我使用的拓扑结构看起来像:

dataKTable = builder.table("compacted-topic-1", "users-json")
    .groupBy((key, value) -> KeyValue.pair(getAgeRange(value), key))
    .count("age-range-counts")

1) 空状态

现在,从初始的空状态开始,所有内容都是这样的:


compacted-topic-1
(empty)


dataKTable
(empty)


// groupBy()
Repartition topic: $APP_ID-age-range-counts-repartition
(empty)

// count()
age-range-counts state store
(empty)

2) 发送几条消息

现在,让我们向作为上面KTable流的compacted-topic-1发送一条消息。这是发生的事情:


compacted-topic-1
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }

dataKTable
3 => { "name" : "Susie", "age_group": "18-24" }
4 => { "name" : "Jerry", "age_group": "18-24" }


// groupBy()
// why does this generate 4 events???
Repartition topic: $APP_ID-age-range-counts-repartition
18-24 => 3
18-24 => 3
18-24 => 4
18-24 => 4

// count()
age-range-counts state store
18-24 => 0

我在想:
我是否可以使用Kafka Streams 0.10.1或0.10.2来实现我正在尝试做的事情? 我已经尝试在DSL中使用groupBy和count,但也许我需要使用reduce之类的东西吗?
此外,我有一些困难理解导致调用add减法器和subtract减法器的情况,因此任何关于这些点的澄清将不胜感激。

你尝试了什么?你卡在哪里了? - Matthias J. Sax
1个回答

8
如果您有包含 "id -> Json" 数据的原始KTable(称其为dataKTable),则可以通过以下方式获取所需内容:
KTable countKTablePerRange
    = dataKTable.groupBy(/* map your age-range to be the key*/)
                .count("someStoreName");

这应该适用于Kafka Streams API的所有版本。

更新

关于重新分区主题中的4个值:那是正确的。对“基本KTable”的每次更新都会写入其“旧值”和“新值”的记录。这是为了正确更新下游KTable所必需的。旧值必须从一个计数中删除,而新值必须添加到另一个计数中。因为您(计数)的KTable可能是分布式的(即在多个并行运行的应用程序实例之间共享),因此两个记录(旧记录和新记录)可能会以不同的方式结束在不同的实例上,因此它们必须作为两个独立的记录发送。(记录格式应比您在问题中显示的更复杂。)

这也解释了为什么您需要减法器和加法器。减法器从聚合结果中删除旧记录,而加法器将新记录添加到聚合结果中。

仍然不确定为什么您在结果中看不到正确的计数。您运行了多少个实例?也许尝试通过在StreamsConfig中设置cache.max.bytes.buffering=0来禁用KTable缓存。


1
看到我的编辑了吗?这对我不起作用 :( 任何帮助都将不胜感激。这似乎很简单 - 但我不理解我在上面的编辑中描述的行为。 - foxygen
2
终于搞定了。不知怎么回事,我的状态存储处于一种奇怪的状态。禁用缓存并在我的开发环境中重置主题解决了这个问题。谢谢Matthias,你总是很有帮助 :) - foxygen
@foxygen,我遇到了同样的问题,在你的情况下(年龄组变化),我的groupingKey也会改变,但是它不会调用subtractor,请问你是如何解决这个问题的? - Abhishek

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接