Apache Kafka:检查主题中消息的存在性

4

我有一个情况需要检查特定的消息是否已经存在于主题中,我需要确保主题中没有重复的消息。

有没有什么优雅的方法来解决这个问题?而不是消耗所有的消息并对它们进行检查。

1个回答

0

我不认为自己是Kafka的专家,但我认为你所提出的问题“违背”了Kafka的本质。

然而,我使用Java的Kafka Streams库想出了一个解决方案。基本上,过程如下:

  • 将每条消息映射到一个新的键值中,其中键是先前键和其值的组合:(key1, message1) -> (key1-message1, message1)

  • 使用键分组消息,这个操作的结果是获得了一个KGroupedStream

  • 应用reduce函数,将值修改为一些自定义值,例如字符串“Duplicated value”。

  • 在减少后将生成的KTable转换为KStream,并将其推送到新的Kafka主题中。

在上述说明中有很多假设,我将提供一些代码以便给出一些提示:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> resources =  builder.stream("topic-where-the-messages-are-sent");

KeyValueMapper<String, String, KeyValue<String,String>> kvMapper = new KeyValueMapper<String, String, KeyValue<String,String>>() {
    public KeyValue<String, String> apply(String key, String value) {
        return new KeyValue<String, String>(key + "-" + value, value);
    }
};

Reducer<String> reducer = new Reducer<String>() {
    public String apply(String value1, String value2) {
        return "Duplicated message";
    }
};

resources.map(kvMapper)
    .groupByKey()
    .reduce(reducer, "test-store-name")
    .toStream()
    .to("unique-message-output");

KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();

请注意,这可能不是最优解决方案,也许您不会认为它是解决问题的“优雅”方式。
希望能对您有所帮助。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接