Kafka Stream 抑制会话窗口聚合

14

我已经在Kafka流应用程序中编写了这段代码:

KGroupedStream<String, foo> groupedStream = stream.groupByKey();
groupedStream.windowedBy(
SessionWindows.with(Duration.ofSeconds(3)).grace(Duration.ofSeconds(3)))
    .aggregate(() -> {...})
    .suppress(Suppressed.untilWindowCloses(unbounded()))
    .toStream()...

如果我理解正确,应该在窗口关闭后按键发出记录。但行为如下:

流不会发出第一条记录,只有在第二个记录之后才将其转发,即使有不同的键,然后第二个记录仅在第三个记录之后发出,依此类推。

我尝试了多个StreamConfigs,包括“exactly_once”以及使用或不使用缓存,但这种行为仍然存在。

提前感谢您的帮助!


如果你想按时间段而不是“会话”聚合数据,我猜你需要使用TimeWindows而不是SessionWindows - Vasyl Sarzhynskyi
那对我没起作用。设定了一个时间窗口,但直到为同一个键添加了新事件,旧窗口的抑制效果才能完成。非常令人沮丧和反直觉! - BalRog
2个回答

18

这是预期的行为。请注意,suppress() 是基于事件时间的。因此,只要没有新的数据到达,时间就不会前进,因此提前驱逐记录将是错误的,因为不能保证下一条记录可能属于当前窗口。


感谢您的快速回复。 在文档中,它指出“在定义了窗口计算之后,您可以抑制中间结果,在窗口关闭时发出每个用户的最终计数”,因此我认为无论我有会话还是时间窗口,只有在窗口关闭后才会发出结果,并且我可以针对每个记录键同时拥有多个窗口。 如果不是这种情况,那么实现这种行为的想法是什么? - dborn
我现在认为我理解了这个行为。非常感谢!那么,使用KStream连接触发某些具有特定键的记录的“驱逐”是否可能? - dborn
1
你在第一条评论中说的都是正确的。关键是,窗口只有在事件时间推进时才能关闭,因此这只会在输入中出现具有较大时间戳的新记录之后发生。-- 不过我不理解你的第二条评论。 - Matthias J. Sax
正确。如果没有新记录,则无法驱逐数据,否则将违反“suppress()”的契约。我知道在测试场景中这可能很笨拙,但在实际部署中,这是流处理,因此没有问题。 - Matthias J. Sax
1
@ValBonn 我构建了一个解决方法,它是一种虚拟流,其中包含一个标点符号器,根据墙上时钟时间执行定期操作(以推动流时间 xD)。 - dborn
显示剩余4条评论

-1

我认为使用“suppress()”的“Session Window”不会产生任何输出。

如有错误,请纠正。据我所知,“suppress()”仅适用于基于时间的窗口,而不适用于基于会话的窗口。


1
suppress 可与 SessionWindows 一起使用 - 您需要确保使用并定义 grace 期间 - 否则将应用默认值 (24h - gapMs),因此窗口在此宽限期内将 '不会关闭'。 另请参见 org.apache.kafka.streams.kstream.SessionWindows#gracePeriodMs || 示例:.windowedBy(SessionWindows.with(Duration.ofSeconds(10)).grace(Duration.ofSeconds(10))) - Hartmut

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接