Kafka Stream Suppress在窗口操作后未产生输出

3

我正在对从流环境中传入的对象进行窗口化处理,收集并打印出来。 使用Kafka suppress避免中间结果。

使用suppress后,我无法输出任何内容。如果我将suppress注释掉,则代码可以正常工作但会打印中间结果。

import com.savk.workout.kafka.streams.kafkastreamsworkout.config.ConfigUtils;
import com.savk.workout.kafka.streams.kafkastreamsworkout.model.Observation;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Properties;

@Component
public class ObservationAnalyser {

    @PostConstruct
    public void initialize() {
        StreamsBuilder streamsBuilder;
        KStream<String, Observation> observationKStream;

        String observationSerde = ConfigUtils.getObservationSerde();        //TODO : Should we move to a JSON Serde?
        Properties kafkaStreamProperties = ConfigUtils.getKafkaStreamConfig();
        kafkaStreamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        kafkaStreamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, observationSerde);

        //JsonSerde<ObservationCollector> observationCollectorJsonSerde = new JsonSerde<>(ObservationCollector.class);

        streamsBuilder = new StreamsBuilder();
        observationKStream = streamsBuilder.stream(ConfigUtils.KAFKA_SOURCE_TOPIC);
        observationKStream
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMillis(ConfigUtils.ONE_MINUTE_IN_MILLISECONDS)))
                .aggregate(
                        () -> new ObservationCollector(),
                        (key, value, observationCollector) -> observationCollector.addObservations(value),
                        Materialized.with(Serdes.String(), new JsonSerde<>(ObservationCollector.class))
                )
                .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))    //AFTER COMMENTING THIS LINE, I CAN SEE THE OUTPUT
                .toStream((key, value) -> key.key())
                .foreach((key, observationCollector) -> {
                    System.out.println("Key :: " + key);
                    for(Observation observation : observationCollector.getObservations())   {
                        System.out.println("Observation :: " + observation);
                    }
                });

        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), kafkaStreamProperties);
        kafkaStreams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
    }

}

我无法找出问题或找到解决问题的资源。


FYI,有一个生产者每4秒钟就会不断地生产数据。 - Anthony
1个回答

1
默认的“宽限期”为1天(出于向后兼容性考虑,如果未使用suppress())。因此,在事件时间提前1天之前,窗口不会关闭。
您可能希望通过以下方式减少宽限期(也可能是保留时间):
  • TimeWindows.of(..).grace(...)
  • Materialized.with(...).withRetentionTime(...)

.windowedBy(TimeWindows.of(Duration.ofMillis(ConfigUtils.ONE_MINUTE_IN_MILLISECONDS)).grace(Duration.ofMillis(0))) 按时间窗口分组(每分钟一个窗口),不容忍延迟。 - Anthony
请问您能提供这个信息的资源链接吗? - Anthony
我可以知道在哪里/如何使用 "Materialized.with(...).withRetentionTime(...)" 吗? - Anthony
1
我猜很遗憾,文档的数量是有限的... suppress() 的原始设计文档可能会有所帮助:https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables -- 对于保留时间,只有在使用“交互式查询”时才有意义设置比所需更长的保留时间-- 我不确定从头脑中是否立即设置宽限期将自动设置较短的保留时间。如果没有,您可以通过设置较短的保留时间来减少存储占用。 - Matthias J. Sax

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接