整合Spring Boot和Reactor-Kafka的KafkaReceiver

9
我正在尝试开发一个使用库reactor-kafka的Spring Boot应用程序,以响应从Kafka主题读取的一些消息。
我有一个配置类,构建了一个KafkaReceiver
@Configuration
public class MyConfiguration {

    @Bean
    public KafkaReceiver<String, String> kafkaReceiver() {
        Map<String, Object> props = new HashMap<>();
        // Options initialisation...
        final ReceiverOptions<String, String> receiverOptions =
                ReceiverOptions.<String, string>create(props)
                               .subscription(Collections.singleton(consumer.getTopic()));
        return KafkaReceiver.create(receiverOptions);
    } 
}

好的...现在呢?使用不太反应灵敏的spring-kafka库,我可以使用@KafkaListener注释一个方法,Spring Boot将为我创建一个从Kafka主题监听的线程。
那么KafkaReceiver应该放在哪里?在我找到的所有示例中,它们直接使用main方法,但这不是Boot way
我正在使用Spring Boot 2.1.3和Reactor-Kafka 1.1.0。
提前致谢。
1个回答

7

既然您拥有了KafkaReceiver bean,现在您可以这样做:

@Bean
public ApplicationRunner runner(KafkaReceiver<String, String> kafkaReceiver) {
        return args -> {
                kafkaReceiver.receive()
                          ...
                          .sunbscribe();
        };
}

ApplicationContext准备好时,将启动这个ApplicationRunner bean。有关更多信息,请查看它的JavaDocs。


2
非常感谢。也许你可以把这个集成添加到文档的某个地方 :) - riccardo.cardin
1
如何运行同一消费者组的多个消费者? - user1955934
您介意发起一个新的SO线程并提供更多细节吗?看起来与旧线程无关... - Artem Bilan

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接