如何正确处理来自Guava的ListenableFuture异常?

4

我有一个库,为我们的客户提供了两种方法:同步和异步。他们可以根据自己的目的调用任何一种方法。

  • executeSynchronous() - 等待直到获得结果,返回结果。
  • executeAsynchronous() - 立即返回一个 Future 对象,如果需要,可以在完成其他事情后处理它。

他们将传递 DataKey 对象,其中包含用户 ID。我们将根据用户 ID 确定要调用哪台机器。因此,我们将使用 AsyncRestTemplate 进行 HTTP 调用,并根据是否成功将响应发送回给他们。

以下是我的接口:

public interface Client {
    // for synchronous
    public DataResponse executeSync(final DataKey key);

    // for asynchronous
    public Future<DataResponse> executeAsync(final DataKey key);
}

以下是我的实现:
public class DataClient implements IClient {

    // does this have to be final?
    private final AsyncRestTemplate restTemplate = new AsyncRestTemplate();

    @Override
    public DataResponse executeSync(final DataKey keys) {
        Future<DataResponse> responseFuture = executeAsync(keys);
        DataResponse response = null;
        try {
            response = responseFuture.get(keys.getTimeout(), TimeUnit.Milliseconds);
        } catch (CancellationException e) {
            // what to do here?
        }  catch (InterruptedException e) {
            // is this right way to deal with InterruptedException?
            throw new RuntimeException("Interrupted", e);
        } catch (ExecutionException e) {
            // what do you mean by ExecutionException? And how should we deal with this?
            DataLogging.logErrors(e.getCause(), DataErrorEnum.ERROR_CLIENT, keys);
            response = new DataResponse(null, DataErrorEnum.ERROR_CLIENT, DataStatusEnum.ERROR);
        } catch (TimeoutException e) {
            DataLogging.logErrors(e.getCause(), DataErrorEnum.TIMEOUT_ON_CLIENT, keys);
            response = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);       
        }

        return response;
    }

    @Override
    public Future<DataResponse> executeAsync(final DataKey keys) {
        final SettableFuture<DataResponse> responseFuture = SettableFuture.create();
        restTemplate.exchange(createURL(keys), HttpMethod.GET, keys.getEntity(), String.class).addCallback(
                new ListenableFutureCallback<ResponseEntity<String>>() {
                    @Override
                    public void onSuccess(ResponseEntity<String> result) {
                        responseFuture.set(new DataResponse(result.getBody(), DataErrorEnum.OK,
                                DataStatusEnum.SUCCESS));
                    }

                    @Override
                    public void onFailure(Throwable ex) {
                        DataLogging.logErrors(ex, DataErrorEnum.ERROR_SERVER, keys);
                        responseFuture.set(new DataResponse(null, DataErrorEnum.ERROR_CLIENT,
                                DataStatusEnum.ERROR));
                    }
                });

        return responseFuture;

    }
}

现在我的问题是:
  • executeSync的catch块中如何正确处理异常?CancellationException和TimeoutException之间有什么区别?一般情况下我们应该如何处理ExecutionException
  • 我的DataKey在接口中必须是final的吗?如果我在executeAsync实现中删除final变量,则会出现编译错误,提示“无法引用在不同方法中定义的内部类中的非最终变量keys”。
  • 这是在我的executeAsync方法中使用ListenableFutureCallback的正确方式吗?还是有更好的使用方法?
欢迎对我关于同步和异步实现的设计提出任何意见/建议。
1个回答

4

我假设您正在使用 Spring 4 (AsyncRestTemplate)。在这种情况下,您获得的 ListenableFuture 并不是真正的 Guava ListenableFuture,而是 Spring 中的克隆版本。无论如何,您应该以与标准 Future 处理异常相同的方式来处理异常。

您的问题的答案:

// does this have to be final? private final AsyncRestTemplate
restTemplate = new AsyncRestTemplate();

在这种情况下,它并没有影响,但一般来说,这是一个好习惯,因为它使对象不可变,简化了对其行为的推理。

catch (CancellationException e) {
    // what to do here?
}
如果任务被取消(通过Future#cancel或ExecutorService#shutdownNow),将抛出CancellationException。在您的情况下,它不可能发生,因为只有您拥有对Future的引用,并且(隐式地通过私有AsyncRestTemplate)引用了ExecutorService用于执行查询。因此,不需要担心这个问题。
throw new AssertionError("executeAsync task couldn't be cancelled", e);

取消异常(CancellationException)和超时异常(TimeoutException)之间有什么区别吗?

在 Future#get 调用中,您指定了超时时间。如果在 keys.getTimeout() 毫秒后仍未获得结果,则会抛出 TimeoutException 异常。

catch (InterruptedException e) {
   // is this right way to deal with InterruptedException?
   throw new RuntimeException("Interrupted", e);
}

在这种情况下,当客户端线程被中断时,不会抛出InterruptedException。你不拥有那个线程,所以你应该传播InterruptedException(即声明executeSync(DataKey keys) throws InterruptedException)。如果由于某种原因无法更改方法的签名,则至少在抛出RuntimeException之前恢复中断标志(Thread.currentThread().interrupt())。

catch (ExecutionException e) {
   // what do you mean by ExecutionException? And how should we deal with this?
   DataLogging.logErrors(e.getCause(), DataErrorEnum.ERROR_CLIENT, keys);
   response = new DataResponse(null, DataErrorEnum.ERROR_CLIENT, DataStatusEnum.ERROR);
}

ExecutionException 表示提交给 ExecutorService 作为 Callable/Runnable 的代码在执行期间抛出异常。在您的情况下,永远不会抛出 ExecutionException,因为您返回了 SettableFuture,并在 onSuccess 和 onFailure 回调中设置了值,所以可以在 catch 块中抛出 AssertionError。没有通用的对 ExecutionException 的响应策略。

我的接口中的 DataKey 必须是 final 吗?

在 executeAsync 实现中必须是 final,因为您从匿名类 (onFailure 回调) 引用它;

这是在我的 executeAsync 方法中使用 ListenableFutureCallback 的正确方式吗?还是有更好的使用方法?

我没有看到任何问题。

一些建议:

  1. 考虑使异步客户端的线程池可配置。

默认情况下,AsyncRestTemplate 使用 SimpleAsyncTaskExecutor 为每个请求创建一个新线程。这可能不适合所有客户端。请注意,如果遵循此建议,则对 CancellationException 的响应必须不同,因为客户端现在可能具有对 ExecutorService 的引用:抛出 RuntimeException 应该没问题。

  1. 在 (java)doc 中描述默认使用的线程池!

  2. 我会分割同步和异步版本。

  3. 我认为使用同步 RestTemplate 并通过同步版本实现异步版本将简化实现。

  4. 考虑返回更灵活的 ListenableFuture,而不是普通的 Future(使用 SettableListenableFuture 而不是 SettableFuture)。


感谢您的建议,非常感激您的帮助。我已经得到了大部分问题的解决思路。如果我需要将异步客户端的线程池设置为可配置的,该怎么做呢?正如您在第一个建议中所说的那样。此外,您能否提供一个SettableListenableFuture的示例?这将有助于我更好地理解。还有,为什么您建议使用SettableListenableFuture而不是SettableFuture? - john
使异步客户端可配置的最简单方法可能是创建第二个构造函数,其中包含ThreadPoolTaskExecutor参数。在此构造函数中,只需创建AsyncRestTemplate并将传递的线程池作为AsyncTaskExecutor提供即可。 - Alex Filatov
executeAsync返回标准Future。如果您将SettableFuture替换为SettableListenableFuture,则可以返回LinstenableFuture。这将允许executeAsync的调用者添加回调(类似于您正在执行的操作)。 - Alex Filatov
实际上,在你的情况下,我关于ExecutionException的说法是错误的。请查看更新。 - Alex Filatov
当然,我明白了。另外,您能告诉我为什么建议为异步客户端配置线程池吗? - john

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接