Kafka生产者配置以立即发送消息

3
我正在使用Kafka生产者0.8.2,尝试以立即发送消息的方式将单个消息发送到主题。我有一个控制台消费者来观察消息是否到达。我注意到,除非我立即运行producer.close(),否则消息不会立即发送,而这并不是我想要做的。
正确的生产者配置设置是什么?我正在使用以下配置(我知道它看起来像一堆不同的配置/版本,但我在文档中找不到像我期望的那样工作的东西):
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokersStr);
props.put(ProducerConfig.RETRIES_CONFIG, "3");
props.put("producer.type", "sync");
props.put("batch.num.messages", "1");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
props.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, true);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
2个回答

1
我发现了一个看起来合理的解决方案,它涉及在生产者的send()命令返回的Future上运行get()。我将send命令从以下内容更改为:
producer.send(record);

以下内容如下:

producer.send(record).get();

如果有更有经验的Kafka用户对这种方法有任何问题,那将是很好的。此外,我想了解是否有生产者的配置设置可以实现相同的功能(即立即发送单个消息而不运行Future的get())。


是的,我真的想知道是否有任何配置可以在不运行get()的情况下实现这一点,因为在某些测试场景中它非常有用。 - undefined

-1

虽然这篇文章很旧了,但我为此困扰了好久,所以在这里发一篇帖子。

在尝试运行 Kafka 示例时,我遇到了同样的问题,只有使用 .get() 才能将消息发送到 Kafka。 KafkaProducer.send(…) 的 Javadoc 明确说明了此方法是异步的。在我的测试代码中,消息确实被发送到了 Kafka,而我的代码继续运行,最后在消息实际发送到 Future 内部之前就已经结束并终止了。

因此,这个.get() 只是在 Future 上阻塞,直到它被实现。这实际上就消除了 Future 带来的好处。一个更简洁的方法可能是在 .send(…) 之后等待一会儿,使用 Thread.sleep(…) (具体要看你的用例)。


睡眠如何更好?现在,你可以使用一些随机等待,而不是等待未来完成。如果必须等待它完成,get() 是更简单的方法。 - Robin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接