Spring Kafka异步发送调用会阻塞

21

我正在使用Spring-Kafka 1.2.1版本,当Kafka服务器宕机/无法访问时,异步发送调用会阻塞一段时间。这似乎是TCP超时的问题。代码类似于:

ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() {
    @Override
    public void onSuccess(SendResult<K, V> result) {
        ...
    }

    @Override
    public void onFailure(Throwable ex) {
        ...
    }
});

我已经快速查看了Spring-Kafka代码,似乎只是将任务传递给Kafka客户端库,将回调交互转换为未来对象交互。查看Kafka客户端库,代码变得更加复杂,我没有花时间理解其中全部,但我猜测它可能在同一线程中进行远程调用(至少元数据吧?)。

作为用户,我希望返回未来的Spring-Kafka方法能够立即返回,即使远程kafka服务器无法访问。

如果我的理解有误或这是一个错误,欢迎任何确认。目前我在本地端使其异步化。

另一个问题是,Spring-Kafka文档在开头说提供同步和异步的发送方法,但我找不到任何不返回未来对象的方法,也许需要更新文档。

如有需要,我很乐意提供任何进一步的细节。谢谢。


你能解决这个问题吗? - noobCoder
不需要了。我已经在我的端口实现了所需的内容,并保持不变。由于现在已经过了很长时间,也许新版本已经解决了这些问题?你试过2.1.10和2.2.0吗? - Carlos E. L. Augusto
5个回答

16

除了在配置类上使用@EnableAsync注解外,还需要在调用此代码的方法上使用@Async注解。

http://www.baeldung.com/spring-async

以下是一些代码片段。Kafka生产者配置:

@EnableAsync
@Configuration
public class KafkaProducerConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);

    @Value("${kafka.brokers}")
    private String servers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, GenericMessage> producerFactory(ObjectMapper objectMapper) {
        return new DefaultKafkaProducerFactory<>(producerConfigs(), new StringSerializer(), new JsonSerializer(objectMapper));
    }

    @Bean
    public KafkaTemplate<String, GenericMessage> kafkaTemplate(ObjectMapper objectMapper) {
        return new KafkaTemplate<String, GenericMessage>(producerFactory(objectMapper));
    }

    @Bean
    public Producer producer() {
        return new Producer();
    }
}

还有制片人本身:

public class Producer {

    public static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);

    @Autowired
    private KafkaTemplate<String, GenericMessage> kafkaTemplate;

    @Async
    public void send(String topic, GenericMessage message) {
        ListenableFuture<SendResult<String, GenericMessage>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, GenericMessage>>() {

            @Override
            public void onSuccess(final SendResult<String, GenericMessage> message) {
                LOGGER.info("sent message= " + message + " with offset= " + message.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(final Throwable throwable) {
                LOGGER.error("unable to send message= " + message, throwable);
            }
        });
    }
}

1
谢谢您的回复。不,我没有使用这些注释,在文档中也没有关于它们的内容。我会尝试两种方法,并告诉您是否解决了问题。 - Carlos E. L. Augusto
1
使用EnableAsync不幸地没有改变任何东西。此外,从链接中我了解到应该是spring-kafka库使用Async注释,因为它为我提供了future对象。 - Carlos E. L. Augusto
2
我同意你的观点,对我来说,提供未来的功能却仍需放置注释是没有意义的。但在我们的情况下,放置这两个注释使得它能够完美运行。我将编辑响应并添加一些代码片段。 - Jorge C
1
我理解你的代码片段是通过使用Spring的方式将自己的代码异步化。它对你起作用,因为send方法返回void,所以Spring在新线程中执行其内容并立即返回给send的调用者。但是,你所做的spring-kafka调用仍然是同步的。你的代码类似于我在我的代码中解决问题时所做的,但我使用了普通的Java,因为我需要更多的控制。无论是你的解决方案还是我的解决方案,如果spring-kafka正确执行,我都会期望执行流程在两个线程之间改变两次。 - Carlos E. L. Augusto

11
如果我看一下KafkaProducer本身,发送消息有两个部分:
  1. 将消息存储到内部缓冲区中。
  2. 将消息从缓冲区上传到Kafka。
KafkaProducer对于第二部分是异步的,而第一部分则不是。
send()方法仍然可以在第一部分上被阻塞,并最终抛出TimeoutExceptions,例如:
  • 主题的元数据没有被缓存或过期,因此生产者尝试从服务器获取元数据,以了解主题是否仍然存在以及它有多少个分区。
  • 缓冲区已满(默认情况下为32MB)。
如果服务器完全无响应,则可能会遇到以上两个问题。
更新: 我在Kafka 2.2.1中进行了测试和确认。看起来这种行为在2.4和/或2.6中可能会有所不同:KAFKA-3720

哇,竟然要等3年才有一个实际的答案,谢谢。虽然我已经不再使用它了,但我确实花时间查看了当前的文档,看看是否有描述这个问题的内容,但并没有。这份文档肯定需要改进。 - Carlos E. L. Augusto
抱歉,我误按了回车键并发送了评论,然后花了超过5分钟来编辑...所以以下是剩下的内容:我没有预料到这种行为,我期望在使用异步发送方法时从future对象中获取任何错误。同步和异步版本中返回future也没有帮助...再次感谢您抽出时间。 - Carlos E. L. Augusto
我其实自己也遇到了这个问题,在意识到它已经三年前的时候回答了你的问题 :-) - GeertPt
阅读您提到的问题,该行为似乎是合理的。然而,这确实需要在文档中进行解释。但正如所说的那样,即使测试也期望不同的行为 :P - Carlos E. L. Augusto

3

最佳解决方案是在生产者级别添加“回调”监听器。

@Bean
public KafkaTemplate<String, WebUserOperation> operationKafkaTemplate() {
    KafkaTemplate<String, WebUserOperation> kt = new KafkaTemplate<>(operationProducerFactory());
    kt.setProducerListener(new ProducerListener<String, WebUserOperation>() {

        @Override
        public void onSuccess(ProducerRecord<String, WebUserOperation> record, RecordMetadata recordMetadata) {
            System.out.println("### Callback :: " + recordMetadata.topic() + " ; partition = " 
                    + recordMetadata.partition()  +" with offset= " + recordMetadata.offset()
                    + " ; Timestamp : " + recordMetadata.timestamp() + " ; Message Size = " + recordMetadata.serializedValueSize());
        }

        @Override
        public void onError(ProducerRecord<String, WebUserOperation> producerRecord, Exception exception) {
            System.out.println("### Topic = " + producerRecord.topic() + " ; Message = " + producerRecord.value().getOperation());
            exception.printStackTrace();
        }
    });
    return kt;
}

我知道我晚了近18个月,但如果你/有人能解释一下这是如何解决所描述的问题,那将会很有帮助。 - ranban282

2

请确认一下,您是否已经应用了@EnableAsync注解?我想说这可能是指定Future<>行为的关键。


谢谢您的回复。不,我没有使用这个注释,在文档中也没有关于它的内容。我会尝试一下,如果解决了问题,我会告诉您的。 - Carlos E. L. Augusto
使用@EnableAsync不幸地没有改变任何事情=/ - Carlos E. L. Augusto

0

以下代码对我来说可以异步获取响应

    ProducerRecord<UUID, Person> person = new ProducerRecord<>(kafkaTemplate.getDefaultTopic(), messageKey,Person);
    Runnable runnable = () -> kafkaTemplate.send(person).addCallback(new MessageAckHandler());
    new Thread(runnable).start();

  public class MessageAckHandler implements ListenableFutureCallback<SendResult<UUID,Person>> {

    @Override
    public void onFailure(Throwable exception) {
      log.error("unable to send message: " + exception.getMessage());
     }

     @Override
     public void onSuccess(SendResult<UUID, ScreeningEvent> result) {
       log.debug("sent message with offset={} messageID={}", result.getRecordMetadata().offset(), result.getProducerRecord().key());
     }
  }

   public class SendResult<K, V> {

     private final ProducerRecord<K, V> producerRecord;

     private final RecordMetadata recordMetadata;

     public SendResult(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
        this.producerRecord = producerRecord;
        this.recordMetadata = recordMetadata;
    }

    public ProducerRecord<K, V> getProducerRecord() {
       return this.producerRecord;
    }

    public RecordMetadata getRecordMetadata() {
       return this.recordMetadata;
    }

    @Override
    public String toString() {
       return "SendResult [producerRecord=" + this.producerRecord + ", recordMetadata=" + this.recordMetadata + "]";
    }

 }

1
当然,这个方法是可行的,但你只是在自己创造一个异步,这也正是我当时所做的,并且也是我在原始问题中提出的。当时我想知道这是否是Spring Kafka中的一个错误,或者我是否漏掉了什么... - Carlos E. L. Augusto
@CarlosE.L.Augusto:请分享一下你在代码中处理问题的方法。 - Praveen kumar singhal

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接