我正在尝试在Spring MVC中实现一种异步的REST方法,用于向Kafka发送消息。一切都运行良好,但当服务器不可用时,onFailure事件会被长时间处理。如何限制ListenableFuture的响应时间,例如将其限制为三秒钟。
以下是我的代码:
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Value("${spring.kafka.topic}")
String topic;
@RequestMapping("/test")
DeferredResult<ResponseEntity<?>> test(
@RequestParam(value = "message") String message
) {
DeferredResult<ResponseEntity<?>> deferredResult = new DeferredResult<>();
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, "testKey", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> sendResult) {
ResponseEntity<String> responseEntity = new ResponseEntity<>("SUCCESS", HttpStatus.OK);
deferredResult.setResult(responseEntity);
}
@Override
public void onFailure(Throwable ex) {
ResponseEntity<String> responseEntity = new ResponseEntity<>("FAILURE", HttpStatus.OK);
deferredResult.setResult(responseEntity);
}
});
return deferredResult;
}
我尝试使用Kafka的REQUEST_TIMEOUT_MS_CONFIG
属性和ListenableFuture的.get(long timeout, TimeUnit unit)
方法,但没有得到期望的结果。
get()
应该在超时后失败;那种情况下会发生什么? - Gary Russellreturn
之前添加future.get(1000, TimeUnit.MILLISECONDS);
,那么在60秒内我会得到以下错误。如果没有这个get方法,我会在60秒内得到正确的“FAILURE”响应。HTTP状态500 - 请求处理失败;嵌套异常是java.util.concurrent.ExecutionException:org.springframework.kafka.core.KafkaProducerException:发送失败;嵌套异常是org.apache.kafka.common.errors.TimeoutException:60000毫秒后更新元数据失败。
- V. Perfilevmax.block.ms
。 - Gary Russell