如何为onFailure事件设置超时时间(Spring,Kafka)?

8

我正在尝试在Spring MVC中实现一种异步的REST方法,用于向Kafka发送消息。一切都运行良好,但当服务器不可用时,onFailure事件会被长时间处理。如何限制ListenableFuture的响应时间,例如将其限制为三秒钟。

以下是我的代码:

@Autowired
KafkaTemplate<String, String> kafkaTemplate;

@Value("${spring.kafka.topic}")
String topic;

@RequestMapping("/test")
DeferredResult<ResponseEntity<?>> test(
        @RequestParam(value = "message") String message
) {

    DeferredResult<ResponseEntity<?>> deferredResult = new DeferredResult<>();
    ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, "testKey", message);

    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

        @Override
        public void onSuccess(SendResult<String, String> sendResult) {
            ResponseEntity<String> responseEntity = new ResponseEntity<>("SUCCESS", HttpStatus.OK);
            deferredResult.setResult(responseEntity);
        }

        @Override
        public void onFailure(Throwable ex) {
            ResponseEntity<String> responseEntity = new ResponseEntity<>("FAILURE", HttpStatus.OK);
            deferredResult.setResult(responseEntity);
        }

    });

    return deferredResult;
}

我尝试使用Kafka的REQUEST_TIMEOUT_MS_CONFIG属性和ListenableFuture的.get(long timeout, TimeUnit unit)方法,但没有得到期望的结果。


get() 应该在超时后失败;那种情况下会发生什么? - Gary Russell
@GaryRussell 如果我在return之前添加future.get(1000, TimeUnit.MILLISECONDS);,那么在60秒内我会得到以下错误。如果没有这个get方法,我会在60秒内得到正确的“FAILURE”响应。HTTP状态500 - 请求处理失败;嵌套异常是java.util.concurrent.ExecutionException:org.springframework.kafka.core.KafkaProducerException:发送失败;嵌套异常是org.apache.kafka.common.errors.TimeoutException:60000毫秒后更新元数据失败。 - V. Perfilev
看看我的回答,你可以减少max.block.ms - Gary Russell
1个回答

9

这是因为生产者默认会阻塞60秒。

请参考Kafka文档中的max.block.ms 生产者配置

max.block.ms 这个配置控制着KafkaProducer.send()和KafkaProducer.partitionsFor()方法阻塞的时间。这些方法可能因为缓冲区已满或元数据不可用而被阻塞。在用户提供的序列化器或分区器中的阻塞不会计入此超时时间。


非常感谢,这正是我所需要的! - V. Perfilev

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接