Kafka生产者非常缓慢。

6
我是 Kafka 的新手,有一个问题一直没有解决。
我在 Windows 上安装了 Kafka 和 Zookeeper,并创建了一个带有多个分区的代理(在 6 到 12 个分区之间)。
当我创建消费者时,它们可以完美地工作并以很快的速度读取数据,但是对于生产者,我创建了一个简单的生产者,就像许多网站上所看到的那样。该生产者位于一个循环内,发送许多短消息(大约 2000 条非常短的消息)。
我发现消费者可以很快地读取这 2000 条消息,但是生产者只能以每秒约 140 或 150 条的速度将消息发送到代理。正如我之前所说,我正在使用自己的笔记本电脑(仅有 1 个磁盘),但是当我阅读有关每秒数百万条消息的文章时,我认为我忘记了什么,因为我与此相距甚远。
如果我使用更多的生产者,结果会更糟。
这是因为需要在同一节点上使用更多的代理还是其他原因?这个问题是我的工作强加给我的,我没有更好的电脑。
创建生产者的代码如下:
public class Producer {

    public void publica(String topic, String strKey, String strValue) {
        Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);
        ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topic, strValue);
        producer.send(rec);
    }
}

发送消息的代码(部分)如下:

Producer prod = new Producer();

for (int i = 0; i < 2000; i++)
{
    key = String.valueOf(i);
    prod.publica("TopicName", key, texto + " - " + key);
    // System.out.println(i + " - " + System.currentTimeMillis());
}

你能发布一下生成所用的代码吗?我猜你正在同步生成 - 你应该采取异步生成。磁盘不应该是一个问题(在后台完成磁盘刷新)。 - Treziac
我不知道你是否解决了问题,以达到你所希望的性能,但需要注意的是,Kafka包含大量的集群/代理配置选项,以及大量的生产者配置选项,这些选项可以通过集群本身或者在初始化生产者时的代码配置选项进行配置。其中一些配置选项可能会对消息生产速率产生重大影响。很抱歉我现在无法给出更具体的信息,因为我正在调查自己的慢生产者问题。 - undefined
1个回答

18
您可以创建Kafka生产者一次,并在每次需要发送消息时重复使用它:
public class Producer {
    private final KafkaProducer<String, String> producer; // initialize in constructor

    public void publica(String topic, String strKey, String strValue) {
        ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topic, strValue);
        producer.send(rec);
    }
}

还要看一下这里提供的生产者和代理配置 here。有几个选项可以根据您的应用程序需求进行调整。


1
我今年54岁,从22岁左右开始就是一名开发人员,有时候我觉得自己还是个初学者。我没有意识到Kafka生产者每次都会被创建而不是只创建一次。我进行了更改然后... - Enrique López Moreno
之前:15秒内2000条消息。之后:大约170毫秒内发送2000条消息。非常感谢。 - Enrique López Moreno
你是不是需要在一定时间内关闭生产者以防止内存泄漏? - user1955934

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接