我正在使用Kafka Streams来计算在一个滚动时间窗口内发生了多少事件,该时间窗口为3分钟:
当运行消费者并将一些消息推送到输入主题时,随着时间的推移,它会接收到以下更新:
几乎是正确的,但它从未接收以下更新:
没有它,我的消费者在较长一段时间内没有事件时将永远无法将计数器归零。我期望消耗的消息看起来像这样:
有没有一些我忽略的配置选项/参数可以帮助我实现这种行为?
public class ViewCountAggregator {
void buildStream(KStreamBuilder builder) {
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
KStream<String, String> views = builder.stream(stringSerde, stringSerde, "streams-view-count-input");
KStream<String, Long> viewCount = views
.groupBy((key, value) -> value)
.count(TimeWindows.of(TimeUnit.MINUTES.toMillis(3)).advanceBy(TimeUnit.MINUTES.toMillis(1)))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), value));
viewCount.to(stringSerde, longSerde, "streams-view-count-output");
}
public static void main(String[] args) throws Exception {
// some not so important initialization code
...
}
}
当运行消费者并将一些消息推送到输入主题时,随着时间的推移,它会接收到以下更新:
single 1
single 1
single 1
five 1
five 4
five 5
five 4
five 1
几乎是正确的,但它从未接收以下更新:
single 0
five 0
没有它,我的消费者在较长一段时间内没有事件时将永远无法将计数器归零。我期望消耗的消息看起来像这样:
single 1
single 1
single 1
single 0
five 1
five 4
five 5
five 4
five 1
five 0
有没有一些我忽略的配置选项/参数可以帮助我实现这种行为?