使用Observable(RxJava)通过Kafka传递消息

5

我有一个使用Kafka的生产者,并且有多个消费者。因此,我发布一个主题中的消息,然后我的消费者接收和处理该消息。

我需要从至少一个消费者中在生产者端接收响应(最好是第一个)。 我正在尝试使用RxJava来实现它(可观察对象)。

这样做有可能吗? 有人有例子吗?

3个回答

7

我是如何使用rxjava '2.2.6'处理Kafka事件的:

import io.reactivex.Observable;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

...

// Load consumer props 
Properties props = new Properties();  
props.load(KafkaUtils.class.getClassLoader().getResourceAsStream("kafka-client.properties")); 

// Create a consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// Subscribe to topics
consumer.subscribe(Arrays.asList(props.getProperty("kafkaTopics").split("\\s*,\\s*")));

// Create an Observable for topic events
Observable<ConsumerRecords<String, String>> observable = Observable.fromCallable(() -> {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSecond(10);
    return records;
});

// Process Observable events
observable.subscribe(records -> {
    if ((records != null) && (!records.isEmpty())) {
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.offset() + ": " + record.value());
        }
    }
});


3
您可以按照以下方式使用它:
val consumer = new RxConsumer("zookeeper:2181", "consumer-group")

consumer.getRecordStream("cool-topic-(x|y|z)")
  .map(deserialize)
  .take(42 seconds)
  .foreach(println)

  consumer.shutdown()

更多信息请查看:https://github.com/cjdev/kafka-rx

1
kafka-rx 还在维护吗?我正在考虑使用它,但自 2015 年 10 月以来没有进行过任何提交。此外,我从未使用过 EPL v.1 许可证。不确定它是否会被我的项目赞助商接受。 - Farrukh Najmi
2
看起来kafka-rx正在使用旧版的kafka和Java 8版本。这个解决方案不应该被接受作为答案。在我看来,理想的答案应该解决如何仅使用rxjava来完成此操作。 - Farrukh Najmi

0

首先,最好您先分享您的解决方案...

由于Spring Cloud Stream是一种流解决方案,而不是请求/响应,因此没有可以与您分享的示例。

您可以考虑将您的消费者也作为生产者。在原始生产者中,有一个消费者从回复主题中读取。最后,您将必须将回复数据与请求数据相关联。

RxJava或任何其他实现细节都与此无关。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接