如何确保消息到达Kafka Broker?

3

我在本地机器上有一个消息生产者,远程主机(aws)上有一个代理。

从生产者发送消息后,我等待并调用远程主机上的控制台消费者,发现有过多的日志记录,但没有生产者的值。

生产者在调用发送方法后刷新数据。一切都已正确配置。

我该如何检查代理是否收到了来自生产者的消息,并查看生产者是否收到了回复?


1
你的意思是你的生产者基本上不工作?你所说的“看到过多的日志”是什么意思?你是否正确配置了代理的监听器? - Robin Moffatt
@RobinMoffatt 我的意思是在执行控制台消费者后,控制台中有很多没有实际价值的无意义日志。有趣的是,在将代理移动到本地机器并编辑URL后,一切都正常工作。我想知道如何处理与AWS上的代理的通信。再次,我打开了所有入站和出站端口。 - voipp
3个回答

1

问题在于将经纪人放置在远程AWS服务器上。如果我将它们放在本地,一切都正常工作。 您的建议如何帮助我? - voipp

1

Send方法异步地将消息发送到主题,并返回一个Future对象,其中包含RecordMetadata

java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record)

异步地将记录发送到主题

在调用flush方法之后,通过调用isDone方法检查Future是否已完成(例如,Future.isDone() == true)。

调用此方法将使所有缓冲的记录立即可用于发送(即使linger.ms大于0),并阻塞等待与这些记录相关联的请求完成。flush()的后置条件是任何先前发送的记录都已完成(例如,Future.isDone() == true)。当请求根据您指定的acks配置成功确认或导致错误时,请求被视为已完成。

RecordMetadata包含offsetpartition

public int partition()

记录被发送到的分区

public long offset()

记录的偏移量,如果{hasOffset()}返回false,则为-1。

或者您也可以使用Callback函数来确保消息是否已发送到主题

完全非阻塞使用可以利用回调参数提供回调,在请求完成时将调用该回调。

这里有一个清晰的示例在文档中

 ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
 producer.send(myRecord,
           new Callback() {
               public void onCompletion(RecordMetadata metadata, Exception e) {
                   if(e != null) {
                      e.printStackTrace();
                   } else {
                      System.out.println("The offset of the record we just sent is: " + metadata.offset());
                   }
               }
           });

我已将批处理发送到远程代理。只要它到达代理,就会创建新的分区(它是新的分区)。但我无法看到我发送到该分区的消息。 - voipp
这些是已更改的分区吗 @voipp - Ryuzaki L
我刚在本地机器上更换了代理,一切正常。 - voipp
@Deadpool,如果 hasOffset() 返回 true,我可以说该主题已经接收到消息了吗? - jumping_monkey
可以的,@jumping_monkey - Ryuzaki L
显示剩余2条评论

1
您可以尝试使用send() API的get()方法,它将返回RecordMetadata的Future。
ProducerRecord<String, String> record = 
new ProducerRecord<>("SampleTopic", "SampleKey", "SampleValue");

try {
    producer.send(record).get(); 
} catch (Exception e) {
    e.printStackTrace(); 
}


@Chandra Mohan G,如果catch块从未执行,也就是说.get()成功“执行”,我可以说主题已经接收到消息吗?还是需要检查返回的RecordMetadata? - jumping_monkey

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接