Spring-kafka监听器并发性

7
我使用 spring-kafka lib 实现了 Kafka 消费者。我有一个 Kafka 主题,其中包含 2 个分区,并且我使用 ConcurrentKafkaListenerContainerFactory 并将并发级别设置为 2,因此每个容器实例应按照 spring-kafka documentation 的规定从单个分区中消费。

KafkaMessageListenerContainer 在单个线程上接收所有主题/分区的所有消息。ConcurrentMessageListenerContainer 委托给 1 个或多个 KafkaMessageListenerContainer 来提供多线程消费。

这是我的消费者类:
@Component
public class KafkaConsumer {
    private HashMap<String, LinkedBlockingQueue<Event>> hashMap = new HashMap<>();

    @KafkaListener(topics = "${kafka.topic}", groupId = "events_group")
    public void receive(ConsumerRecord<?, ?> record, Consumer consumer) throws InterruptedException {
        String message = record.value().toString();
        Event event = EventFactory.createEvent(message);
        String customerId = event.getAttributeStringValue(DefinedField.CUSTOMER_ID);
        // add event to hashMap
        LinkedBlockingQueue<Event> queue = hashMap.get(customerId);
        if (queue == null) {
            queue = new LinkedBlockingQueue<>();
            queue.add(event);
            hashMap.put(customerId, queue);
        } else {
            queue.add(event);
        }
    }
}

正如您所看到的,我在“hashMap”集合中根据消息“customer_id”属性将我的事件放入相应的队列中。这种功能需要额外的同步以防止多个线程访问,并且正如我所看到的,Spring-Kafka为所有容器创建了一个bean实例,而不是为每个容器创建一个单独的bean实例以避免并发问题。
我该如何通过编程方式更改此逻辑?
我认为唯一奇怪的解决此问题的方法是使用两个运行单个线程消费者的单独应用程序的JVM,因此可以单线程访问带有#receive方法的KafkaConsumer类。
1个回答

6

没错,就是这样的。该框架确实不依赖于bean,只依赖于其方法将消息传递给函数。

您可以考虑为主题中的每个分区拥有两个@KafkaListener方法。确实,来自一个分区的记录会在单个线程中传递到@KafkaListener。因此,如果您真的无法接受这种状态,可以为每个线程使用两个HashMap

该监听器抽象背后的一般思想正是关于无状态行为的。那个KafkaConsumer是常规Spring singleton bean。您必须接受这个事实并根据这种情况重新设计您的解决方案。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接