如何从Kafka流获取窗口聚合?

6
我有一组事件流,想根据时间窗口进行聚合。我的解决方案提供了增量聚合而不是按时间窗口聚合。我了解到这在流处理中很常见,因为它会给出更改日志作为结果。在研究中,我看到了使用Kafka Streams DSL进行两步窗口聚合如何发送时间窗口KTable的最终Kafka Streams聚合结果? 但是第一个问题的解决方案有点过时(使用已弃用的API)。我使用了那些建议替代已弃用API的新API。这就是我的解决方案。
KStream<String, Event> eventKStream = summarizableData.mapValues(v -> v.getEvent());
    KGroupedStream<String, Event> kGroupedStream = eventKStream.groupBy((key, value) -> {
             String groupBy = getGroupBy(value, criteria);
             return groupBy;
    }, Serialized.with(Serdes.String(), eventSerde));


    long windowSizeMs = TimeUnit.SECONDS.toMillis(applicationProperties.getWindowSizeInSeconds());
    final TimeWindowedKStream<String, Event> groupedByKeyForWindow = kGroupedStream
            .windowedBy(TimeWindows.of(windowSizeMs)
                    .advanceBy(windowSizeMs));

但是,我之前已经解释过,我的结果不是在特定的时间窗口内给出的,而是作为递增的聚合输出。我需要使我的数据按照指定的时间窗口大小进行输出。同时,我了解到 CACHE_MAX_BYTES_BUFFERING_CONFIG 可以控制输出,但我需要一种在任何情况下都能够可靠运行的解决方案。还要注意的是,https://cwiki.apache.org/confluence/display/KAFKA/Windowed+aggregations+over+successively+increasing+timed+windows 给出的模式现在已经过时,因为它使用的是旧的API。(我正在使用 kafka-streams 1.1.0 版本)

1个回答

5
问题出在我的失误上。上面的代码示例运行良好。但是最后我将 KTable 转换为 KStream。这就是问题所在。将其转换为 KStream 也会输出中间结果。在https://cwiki.apache.org/confluence/display/KAFKA/Windowed+aggregations+over+successively+increasing+timed+windows 中提供的模式可以正常工作。我有问题的代码如下,
// Aggregation

KTable<Windowed<String>, Event> results = groupedByKeyForWindow.aggregate(new AggregateInitiator(), new EventAggregator());

// This converstion causing changelog to output. Instead use next line.
KStream<String, AggregationMessage> aggregationMessageKStream = results.toStream((key, value) -> key.toString())
                .mapValues(this::convertToAggregationMessage).filter((k, v) -> v != null);

// output KTable to sample topic. But this output controlled by 
// COMMIT_INTERVAL_MS_CONFIG and CACHE_MAX_BYTES_BUFFERING_CONFIG parameters. 
// I'm using default values for these params.
results.to(windowedSerde, eventSerde,  "Sample");

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接