我使用 spring-kafka lib 实现了 Kafka 消费者。我有一个 Kafka 主题,其中包含 2 个分区,并且我使用
正如您所看到的,我在“hashMap”集合中根据消息“customer_id”属性将我的事件放入相应的队列中。这种功能需要额外的同步以防止多个线程访问,并且正如我所看到的,Spring-Kafka为所有容器创建了一个bean实例,而不是为每个容器创建一个单独的bean实例以避免并发问题。
我该如何通过编程方式更改此逻辑?
我认为唯一奇怪的解决此问题的方法是使用两个运行单个线程消费者的单独应用程序的JVM,因此可以单线程访问带有#receive方法的KafkaConsumer类。
ConcurrentKafkaListenerContainerFactory
并将并发级别设置为 2,因此每个容器实例应按照 spring-kafka documentation 的规定从单个分区中消费。
这是我的消费者类:KafkaMessageListenerContainer 在单个线程上接收所有主题/分区的所有消息。ConcurrentMessageListenerContainer 委托给 1 个或多个 KafkaMessageListenerContainer 来提供多线程消费。
@Component
public class KafkaConsumer {
private HashMap<String, LinkedBlockingQueue<Event>> hashMap = new HashMap<>();
@KafkaListener(topics = "${kafka.topic}", groupId = "events_group")
public void receive(ConsumerRecord<?, ?> record, Consumer consumer) throws InterruptedException {
String message = record.value().toString();
Event event = EventFactory.createEvent(message);
String customerId = event.getAttributeStringValue(DefinedField.CUSTOMER_ID);
// add event to hashMap
LinkedBlockingQueue<Event> queue = hashMap.get(customerId);
if (queue == null) {
queue = new LinkedBlockingQueue<>();
queue.add(event);
hashMap.put(customerId, queue);
} else {
queue.add(event);
}
}
}
正如您所看到的,我在“hashMap”集合中根据消息“customer_id”属性将我的事件放入相应的队列中。这种功能需要额外的同步以防止多个线程访问,并且正如我所看到的,Spring-Kafka为所有容器创建了一个bean实例,而不是为每个容器创建一个单独的bean实例以避免并发问题。
我该如何通过编程方式更改此逻辑?
我认为唯一奇怪的解决此问题的方法是使用两个运行单个线程消费者的单独应用程序的JVM,因此可以单线程访问带有#receive方法的KafkaConsumer类。