我有一个情况需要检查特定的消息是否已经存在于主题中,我需要确保主题中没有重复的消息。
有没有什么优雅的方法来解决这个问题?而不是消耗所有的消息并对它们进行检查。
我有一个情况需要检查特定的消息是否已经存在于主题中,我需要确保主题中没有重复的消息。
有没有什么优雅的方法来解决这个问题?而不是消耗所有的消息并对它们进行检查。
我不认为自己是Kafka的专家,但我认为你所提出的问题“违背”了Kafka的本质。
然而,我使用Java的Kafka Streams库想出了一个解决方案。基本上,过程如下:
将每条消息映射到一个新的键值中,其中键是先前键和其值的组合:(key1, message1) -> (key1-message1, message1)
使用键分组消息,这个操作的结果是获得了一个KGroupedStream。
应用reduce函数,将值修改为一些自定义值,例如字符串“Duplicated value”。
在减少后将生成的KTable转换为KStream,并将其推送到新的Kafka主题中。
在上述说明中有很多假设,我将提供一些代码以便给出一些提示:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> resources = builder.stream("topic-where-the-messages-are-sent");
KeyValueMapper<String, String, KeyValue<String,String>> kvMapper = new KeyValueMapper<String, String, KeyValue<String,String>>() {
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<String, String>(key + "-" + value, value);
}
};
Reducer<String> reducer = new Reducer<String>() {
public String apply(String value1, String value2) {
return "Duplicated message";
}
};
resources.map(kvMapper)
.groupByKey()
.reduce(reducer, "test-store-name")
.toStream()
.to("unique-message-output");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();