我是 Kafka 的新手,有一个问题一直没有解决。
我在 Windows 上安装了 Kafka 和 Zookeeper,并创建了一个带有多个分区的代理(在 6 到 12 个分区之间)。
当我创建消费者时,它们可以完美地工作并以很快的速度读取数据,但是对于生产者,我创建了一个简单的生产者,就像许多网站上所看到的那样。该生产者位于一个循环内,发送许多短消息(大约 2000 条非常短的消息)。
我发现消费者可以很快地读取这 2000 条消息,但是生产者只能以每秒约 140 或 150 条的速度将消息发送到代理。正如我之前所说,我正在使用自己的笔记本电脑(仅有 1 个磁盘),但是当我阅读有关每秒数百万条消息的文章时,我认为我忘记了什么,因为我与此相距甚远。
如果我使用更多的生产者,结果会更糟。
这是因为需要在同一节点上使用更多的代理还是其他原因?这个问题是我的工作强加给我的,我没有更好的电脑。
创建生产者的代码如下:
我在 Windows 上安装了 Kafka 和 Zookeeper,并创建了一个带有多个分区的代理(在 6 到 12 个分区之间)。
当我创建消费者时,它们可以完美地工作并以很快的速度读取数据,但是对于生产者,我创建了一个简单的生产者,就像许多网站上所看到的那样。该生产者位于一个循环内,发送许多短消息(大约 2000 条非常短的消息)。
我发现消费者可以很快地读取这 2000 条消息,但是生产者只能以每秒约 140 或 150 条的速度将消息发送到代理。正如我之前所说,我正在使用自己的笔记本电脑(仅有 1 个磁盘),但是当我阅读有关每秒数百万条消息的文章时,我认为我忘记了什么,因为我与此相距甚远。
如果我使用更多的生产者,结果会更糟。
这是因为需要在同一节点上使用更多的代理还是其他原因?这个问题是我的工作强加给我的,我没有更好的电脑。
创建生产者的代码如下:
public class Producer {
public void publica(String topic, String strKey, String strValue) {
Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configProperties);
ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topic, strValue);
producer.send(rec);
}
}
发送消息的代码(部分)如下:
Producer prod = new Producer();
for (int i = 0; i < 2000; i++)
{
key = String.valueOf(i);
prod.publica("TopicName", key, texto + " - " + key);
// System.out.println(i + " - " + System.currentTimeMillis());
}