如何在kafka 0.9.0中使用多线程消费者?

10

kafka文档提供了一种方法,如下所述:

每个线程一个消费者:一个简单的选择是为每个线程分配一个独立的消费者实例。

我的代码:

public class KafkaConsumerRunner implements Runnable {

    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final CloudKafkaConsumer consumer;
    private final String topicName;

    public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) {
        this.consumer = consumer;
        this.topicName = topicName;
    }

    @Override
    public void run() {
        try {
            this.consumer.subscribe(topicName);
            ConsumerRecords<String, String> records;
            while (!closed.get()) {
                synchronized (consumer) {
                    records = consumer.poll(100);
                }
                for (ConsumerRecord<String, String> tmp : records) {
                    System.out.println(tmp.value());
                }
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            System.out.println(e);
            //if (!closed.get()) throw e;
        }
    }

    // Shutdown hook which can be called from a separate thread
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }

    public static void main(String[] args) {
        CloudKafkaConsumer kafkaConsumer = KafkaConsumerBuilder.builder()
                .withBootstrapServers("172.31.1.159:9092")
                .withGroupId("test")
                .build();
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log"));
        executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log.info"));
        executorService.shutdown();
    }
}

但它不起作用并抛出异常:

java.util.ConcurrentModificationException:KafkaConsumer不适合多线程访问

此外,我阅读了Flink的源代码(一个用于分布式流和批量数据处理的开源平台)。Flink使用的多线程消费者与我的类似。

long pollTimeout = Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, Long.toString(DEFAULT_POLL_TIMEOUT)));
pollLoop: while (running) {
    ConsumerRecords<byte[], byte[]> records;
    //noinspection SynchronizeOnNonFinalField
    synchronized (flinkKafkaConsumer.consumer) {
        try {
            records = flinkKafkaConsumer.consumer.poll(pollTimeout);
        } catch (WakeupException we) {
            if (running) {
                throw we;
            }
            // leave loop
            continue;
        }
    }

Flink Kafka Consumer 09的多线程代码。

有什么问题吗?

3个回答

14

Kafka消费者不是线程安全的。正如您在问题中指出的那样,文档说明了:

一个简单的选择是为每个线程提供一个单独的消费者实例。

但是在您的代码中,相同的消费者实例被不同的KafkaConsumerRunner实例包装。因此,多个线程正在访问同一消费者实例。 Kafka文档明确说明:

Kafka消费者不是线程安全的。所有网络I/O操作都发生在调用应用程序的线程中。用户有责任确保多线程访问得到适当的同步。未同步的访问将导致ConcurrentModificationException。

这正是您收到的异常。


4

在调用subscribe时,它会抛出异常。 this.consumer.subscribe(topicName);

将该块移至同步块中,如下所示:

@Override
public void run() {
    try {
        synchronized (consumer) {
            this.consumer.subscribe(topicName);
        }
        ConsumerRecords<String, String> records;
        while (!closed.get()) {
            synchronized (consumer) {
                records = consumer.poll(100);
            }
            for (ConsumerRecord<String, String> tmp : records) {
                System.out.println(tmp.value());
            }
        }
    } catch (WakeupException e) {
        // Ignore exception if closing
        System.out.println(e);
        //if (!closed.get()) throw e;
    }
}

适用于我。 - Prasath

2

也许不是你的情况,但如果你正在处理多个主题的数据合并,那么你可以使用同一个消费者从多个主题读取数据。如果不是这种情况,最好创建单独的作业来消费每个主题。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接