有没有一种好的方法来检查Datastax Session.executeAsync()是否抛出了异常?

9

我正在尝试通过调用session.executeAsync()而不是session.execute()来加速我们的代码,以进行数据库写入。

我们有一些用例,在这些用例中DB连接可能会中断,当前的execute()在连接丢失时会抛出异常(无法连接到集群中的任何主机)。我们可以捕获这些异常并重试或将数据保存到其他地方等...

使用executeAsync()似乎没有任何方法来实现此用例 - 返回的ResultSetFuture对象需要被访问以检查结果,这将破坏首次使用executeAsync()的目的...

是否有任何方法可以在任何地方添加监听器(或类似内容)来异步通知其他代码DB写入失败的情况?

这与以下版本相关:

Datastax 1.0.2

Java 1.7.40

2个回答

15

你可以尝试像这样做,因为 ResultSetFuture 实现了 Guava 库中的 ListenableFuture:

    ResultSetFuture resultSetFuture = session.executeAsync("SELECT * FROM test.t;");
    Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(@Nullable com.datastax.driver.core.ResultSet resultSet) {
            // do nothing
        }

        @Override
        public void onFailure(Throwable throwable) {
            System.out.printf("Failed with: %s\n", throwable);
        }
    });

采用这种方法不会阻塞您的应用程序。


这看起来就像是我正在尝试实现的解决方案 - 谢谢! - Phillip Atkinson

1
您可以将回调函数传递给该方法以对异常进行操作。如果需要ResultSetFuture,您可以尝试像这样做:
interface ResultSetFutureHandler {
    void handle(ResultSetFuture rs);
}

public void catchException(ResultSetFutureHandler handler) {
    ResultSetFuture resultSet = null;
    try {
        resultSet = getSession().executeAsync(query);
        for (Row row : results.getUninterruptibly()) {
            // do something
        }
    } catch (RuntimeException e) {
        handler.handle(resultSet); // resultSet may or may not be null
    }
}

然后像这样调用它:
catchException(new ResultSetFutureHandler() {
    void handle(ResultSetFuture resultSet) {
        // do something with the ResultSetFuture
    }
});

如果你需要知道异常是什么,请添加一个异常参数:
interface ResultSetFutureHandler {
    void handle(ResultSetFuture rs, RuntimeException e);
}

2
嗨!谢谢你的想法。但我认为getUninterruptibly()调用会阻塞,直到查询返回结果,所以这似乎会抵消使用executeAsync()的好处?我正在研究如何使用executeAsync()来“点火并忘记”一个查询,并且有一种方式来挂钩某种异步监听器,如果出现异常就会被调用。 - Phillip Atkinson
这就是它的作用。 “listener” 是您传入的 ResultSetFutureHandler - 当出现异常时,它将被调用,就像您想要的那样。您可以以不同的方式打包它以使其更加正式或优雅,但是想法是相同的。 - Bohemian
当您的代码示例出现异常时,由于代码等待getUninterruptibly()调用,我们是否已经失去了executeAsync()的优势?这是我看到的示例操作方式: // executeAsync()只会抛出IllegalStateException,我认为这并不表示网络故障 resultSet = getSession().executeAsync(query); // getUninterruptibly()会阻塞,直到查询实际运行并返回 // 来自javadoc:“等待查询返回并返回其结果。” for (Row row : resultSet.getUninterruptibly()) { - Phillip Atkinson
由于getUniterruptibly()会抛出期望的NoHostAvailableException/QueryExecutionException,但同时也会阻塞直到查询运行,我不认为调用该方法有助于我寻求异步处理的解决方案? - Phillip Atkinson
我试图避免需要线程处理任何东西,因为到那时我可能会将整个DB调用线程化,并简单地使用execute()而不是executeAsync()。也许我正在寻找的东西根本不可能 :) - Phillip Atkinson
最终,异步调用需要一个线程。我很惊讶API不允许您提供回调函数以在查询返回数据时进行处理。这是我的期望 - 大多数异步API都会这样做。 - Bohemian

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接