Cassandra如何处理阻塞执行语句在DataStax Java驱动程序中

4
阻塞执行来自com.datastax.driver.core.Session的方法。
public ResultSet execute(Statement statement);

对这个方法的评论:

该方法会一直阻塞,直到从数据库中收到至少一些结果。但是,对于SELECT查询,它不能保证结果已经完全接收。但是,它确保已经从数据库中收到了一些响应,并且特别保证如果请求无效,此方法将抛出异常。

来自com.datastax.driver.core.Session的非阻塞执行方法

public ResultSetFuture executeAsync(Statement statement);

这种方法不会阻塞。一旦查询已经传递给底层网络堆栈,它就会返回。特别是,从此方法返回并不能保证查询有效或已提交到实时节点。任何与查询失败有关的异常都将在访问{@link ResultSetFuture}时抛出。

我有两个问题,希望您能帮助我理解。

假设我有100万条记录,我希望它们全部到达数据库(没有任何丢失)。

问题1:如果我有n个线程,所有线程将具有相同数量的记录,它们需要使用阻塞执行调用向Cassandra发送多个插入查询。如果我增加n的值,是否也可以加快我需要将所有记录插入到Cassandra的时间?

这会导致Cassandra的性能问题吗?Cassandra必须确保对于每个单独的插入记录,集群中的所有节点都应立即知道新记录吗?为了维护数据的一致性。(我假设Cassandra节点甚至不考虑使用本地机器时间来控制记录插入时间)。

问题2:使用非阻塞执行,我如何确保所有插入都成功?我所知道的唯一方法是等待ResultSetFuture检查插入查询的执行情况。我能做到更好吗?非阻塞执行失败的可能性比阻塞执行更高吗?

非常感谢您的帮助。

2个回答

6
如果我有n个线程,所有线程都必须发送相同数量的记录到数据库。他们继续使用阻塞执行调用向Cassandra发送多个插入查询。如果我增加n的值,是否也有助于加快将所有记录插入Cassandra所需的时间?
在某种程度上是这样的。让我们把客户端实现细节分开一点,从“并发请求的数量”的角度来看问题,因为如果您使用executeAsync,则不需要为每个正在进行的请求都有一个线程。在我的测试中,我发现虽然具有高并发请求的数量非常有价值,但存在一个门槛,超过这个门槛,性能开始下降或出现收益递减。我的一般经验法则是(节点数* native_transport_max_threads(默认值:128)* 2),但您可能会发现更优化的结果。
这里的想法是,排队更多请求不会带来比cassandra一次处理更多请求的价值。通过减少正在进行的请求数量,可以限制驱动程序客户端和cassandra之间的连接上不必要的拥塞。
问2:使用非阻塞执行,如何确保所有插入都成功?我知道的唯一方法是等待ResultSetFuture检查插入查询的执行情况。我能做到更好吗?非阻塞执行失败的几率比阻塞执行更高吗?
通过等待ResultSetFuture并使用get是一种方法,但如果您正在开发完全异步的应用程序,则要尽可能避免阻塞。使用guava,您最好的工具是Futures.addCallbackFutures.transform
  • Futures.addCallback允许您注册一个FutureCallback,当驱动程序接收到响应时执行。在成功的情况下执行onSuccess,否则执行onFailure

  • Futures.transform允许您有效地将返回的ResultSetFuture映射为其他内容。例如,如果您只想要1列的值,可以使用它将ListenableFuture<ResultSet>转换为ListenableFuture<String>,而无需在代码中阻塞ResultSetFuture并获取字符串值。

在编写数据加载程序的上下文中,您可以执行以下操作:

为了保持简单,可以使用Semaphore或其他具有固定许可数量的构造(这将是您最大的飞行请求数量)。每当您使用executeAsync提交查询时,都会获取一个许可证。您应该只需要1个线程(但可能希望引入一个大小为# cpu核心的池),该线程从Semaphore获得许可并执行查询。它将在获取到可用许可之前一直阻塞。
对于从executeAsync返回的future,请使用Futures.addCallback。回调应在onSuccessonFailure情况下都调用Sempahore.release()。通过释放许可证,这应该允许您在步骤1中的线程继续并提交下一个请求。
为了进一步提高吞吐量,您可能希望考虑使用BatchStatement并批量提交请求。如果您保持批次较小(50-250是一个不错的数字)并且如果批次中的所有插入都共享相同的分区键,则这是一个很好的选择。

我对“节点*本地传输最大线程”这一点并不满意。特别是,这种推理(在排队的请求超过Cassandra一次处理的数量时,价值不高)假定旅行时间是瞬间/可以忽略不计的。如果我的客户端和Cassandra节点之间有100毫秒的单向行程时间,并且服务器可以在2毫秒内处理请求,那么我希望一次将约50个请求发送出去。这里的想法是,我现在放在网络上的请求将在约100毫秒后到达,而在此期间,服务器可以处理约50条消息,我希望保持服务器繁忙,始终确保它有工作要做。 - Micah Zoltu

0
除了上面的答案之外,
看起来execute()调用executeAsync(statement).getUninterruptibly(),因此无论您是使用execute()自己管理“n线程池”,并阻塞自己直到执行完成,最多运行n个线程,还是在所有记录上使用executeAsync(),cassandra端性能应该大致相同,具体取决于执行时间/计数+超时。
它们的执行将从池中借用连接,每个执行在客户端具有一个streamId,并在响应返回此streamId时通过future通知您,受客户端每个连接的总请求和每个节点上读取线程的总请求限制,任何更高的数字都将在队列中缓冲(而不是被阻塞),限制为连接maxQueueSize和maxRequestsPerConnection,任何超过这个数字的都应该失败。这样做的好处是executeAsync()不会在每个请求/执行上运行新线程。
因此,必须对可以通过execute()或executeAsync()运行的请求数量进行限制,在execute()中,您正在避免超出这些限制。

就性能而言,如果每个节点无法处理,则会开始出现惩罚,因此使用具有良好大小池的execute()对我来说是有意义的。更好的方法是使用反应式架构,以避免创建太多什么都不做的线程,因此大量线程将在客户端上造成浪费的上下文切换。对于较少的请求,通过避免线程池,使用executeAsync()将更好。

DefaultResultSetFuture future = new DefaultResultSetFuture(..., makeRequestMessage(statement, null));
new RequestHandler(this, future, statement).sendRequest();


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接