Kafka多线程手动提交偏移量消费者: KafkaConsumer不支持多线程访问

3
我使用 ArrayBlockingQueue 来解耦 Kafka 消费者和接收器:
  1. 多线程消费 Kafka,每个线程一个 Kafka 消费者;
  2. Kafka 消费者手动管理偏移量;
  3. Kafka 消费者将消息内容和包含 OFFSET 的回调函数封装到 Record 对象中,并将其发送到 ArrayBlockingQueue
  4. 接收器从 ArrayBlockingQueue 中获取记录并处理它。只有在接收器成功处理记录后,才会调用 Record 对象的回调函数(通知 Kafka 消费者 commitSync)。

在操作过程中,我遇到了一个错误,困扰了我几天。我不明白哪部分出了问题:

11:44:10.794 [pool-2-thread-1] ERROR com.alibaba.kafka.source.KafkaConsumerRunner - [pool-2-thread-1] ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1824)
    at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:1808)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1255)
    at com.alibaba.kafka.source.KafkaConsumerRunner$1.call(KafkaConsumerRunner.java:75)
    at com.alibaba.kafka.source.KafkaConsumerRunner$1.call(KafkaConsumerRunner.java:71)
    at com.alibaba.kafka.sink.Sink.run(Sink.java:25)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

源代码:

Queues.java

public class Queues {
    public static volatile BlockingQueue[] queues;

    /**
     * Create Multiple Queues.
     * @param count The number of queues created.
     * @param capacity The Capacity of each queue.
     */
    public static void createQueues(final int count, final int capacity) {
        Queues.queues = new BlockingQueue[count];
        for (int i=0; i<count; ++i) {
            Queues.queues[i] = new ArrayBlockingQueue(capacity, true);
        }
    }
}

记录。
@Builder
@Getter
public class Record {
    private final String value;
    private final Callable<Boolean> ackCallback;
}

Sink.java

public class Sink implements Runnable {
    private final int queueId;

    public Sink(int queueId) {
        this.queueId = queueId;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Record record = (Record) Queues.queues[this.queueId].take();
                // (1) Handler: Write to database
                Thread.sleep(10);
                // (2) ACK: notify kafka consumer to commit offset manually
                record.getAckCallback().call();
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    }
}

KafkaConsumerRunner(卡夫卡消费者运行器)
@Slf4j
public class KafkaConsumerRunner implements Runnable {
    private final String topic;
    private final KafkaConsumer<String, String> consumer;

    public KafkaConsumerRunner(String topic, Properties properties) {
        this.topic = topic;
        this.consumer = new KafkaConsumer<>(properties);
    }

    @Override
    public void run() {
        // offsets to commit
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
        // Subscribe topic
        this.consumer.subscribe(Collections.singletonList(this.topic));
        // Consume Kafka Message
        while (true) {
            try {
                ConsumerRecords<String, String> consumerRecords = this.consumer.poll(10000L);
                for (TopicPartition topicPartition : consumerRecords.partitions()) {
                    for (ConsumerRecord<String, String> consumerRecord : consumerRecords.records(topicPartition)) {
                        // (1) Restore [partition -> offset] Map
                        offsetsToCommit.put(topicPartition, new OffsetAndMetadata(consumerRecord.offset()));
                        // (2) Put into queue
                        int queueId = topicPartition.partition() % Queues.queues.length;
                        Queues.queues[queueId].put(Record.builder()
                                .value(consumerRecord.value())
                                .ackCallback(this.getAckCallback(offsetsToCommit))
                                .build());
                    }
                }
            } catch (ConcurrentModificationException | InterruptedException e) {
                log.error("[{}] {}", Thread.currentThread().getName(), ExceptionUtils.getMessage(e), e);
                System.exit(1);
            }
        }
    }

    private Callable<Boolean> getAckCallback(Map<TopicPartition, OffsetAndMetadata> offsets) {
        return new AckCallback<Boolean>(this.consumer, new HashMap<>(offsets)) {
            @Override
            public Boolean call() throws Exception {
                try {
                    this.getConsumer().commitSync(this.getOffsets());
                    return true;
                } catch (Exception e) {
                    log.error(String.format("[%s] %s", Thread.currentThread().getName(), ExceptionUtils.getMessage(e)), e);
                    return false;
                }
            }
        };
    }

    @Getter
    @AllArgsConstructor
    abstract class AckCallback<T> implements Callable<T> {
        private final KafkaConsumer<String, String> consumer;
        private final Map<TopicPartition, OffsetAndMetadata> offsets;
    }
}

Application.java

public class Application {
    private static final String TOPIC = "YEWEI_TOPIC";
    private static final int QUEUE_COUNT = 1;
    private static final int QUEUE_CAPACITY = 4;
    
    private static void createQueues() {
        Queues.createQueues(QUEUE_COUNT, QUEUE_CAPACITY);
    }

    private static void startupSource() {
        if (null == System.getProperty("java.security.auth.login.config")) {
            System.setProperty("java.security.auth.login.config", "jaas.conf");
        }

        Properties properties = new Properties();
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1");
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "cdh1:9092,cdh2:9092,cdh3:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        ExecutorService executorService = Executors.newFixedThreadPool(QUEUE_COUNT);
        for (int queueId = 0; queueId < QUEUE_COUNT; ++queueId) {
            executorService.execute(new KafkaConsumerRunner(TOPIC, properties));
        }
    }

    private static void startupSinks() {
        ExecutorService executorService = Executors.newFixedThreadPool(QUEUE_COUNT);
        for (int queueId = 0; queueId < QUEUE_COUNT; ++queueId) {
            executorService.execute(new Sink(queueId));
        }
    }

    public static void main(String[] args) {
        Application.createQueues();
        Application.startupSource();
        Application.startupSinks();
    }
}

1
不要创建多个BlockingQueue,而是创建一个单一的BlockingQueue,并将该队列与线程共享。这些BlockingQueue天生就是线程安全的。 - undefined
1个回答

1
我解决了这个问题。Kafka消费者在自己的线程中运行,并且也会被Sink线程回调。KafkaConsumer的pollcommitSync方法只能应用于一个线程。请参见org.apache.kafka.clients.consumer.KafkaConsumer#acquireAndEnsureOpen
更改为:Sink回调不直接使用consumer对象,而是将ACK消息发送到LinkedTransferQueue。KafkaConsumerRunner每次轮询LinkedTransferQueue并批量处理ACKs。
@Slf4j
public class KafkaConsumerRunner implements Runnable {
    private final String topic;
    private final BlockingQueue ackQueue;
    private final KafkaConsumer<String, String> consumer;

    public KafkaConsumerRunner(String topic, Properties properties) {
        this.topic = topic;
        this.ackQueue = new LinkedTransferQueue<Map<TopicPartition, OffsetAndMetadata>>();
        this.consumer = new KafkaConsumer<>(properties);
    }

    @Override
    public void run() {
        // Subscribe topic
        this.consumer.subscribe(Collections.singletonList(this.topic));
        // Consume Kafka Message
        while (true) {
            while (!this.ackQueue.isEmpty()) {
                try {
                    Map<TopicPartition, OffsetAndMetadata> offsets = (Map<TopicPartition, OffsetAndMetadata>) this.ackQueue.take();
                    this.consumer.commitSync(offsets);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            ...
        }
    }

    private Callable<Boolean> getAckCallback(Map<TopicPartition, OffsetAndMetadata> offsets) {
        return new AckCallback<Boolean>(new HashMap<>(offsets)) {
            @Override
            public Boolean call() throws Exception {
                try {
                    ackQueue.put(offsets);
                    return true;
                } catch (Exception e) {
                    log.error(String.format("[%s] %s", Thread.currentThread().getName(), ExceptionUtils.getMessage(e)), e);
                    System.exit(1);
                    return false;
                }
            }
        };
    }

    ...
}


嗨 @yewei.oyyw,你能告诉我如何配置Camel Kafka多线程消费者吗?我知道我们需要将消费者的数量设置为与Kafka分区的数量相同。但是我想知道如何让一个Kafka消费者内部有多个线程来消费消息。 - undefined

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接