private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
我的上述保存方法将从多个线程以非常快的速度调用。
问题:
我想对executeAsync
方法进行请求限流,它会异步写入Cassandra。如果我以非常高的速度写入,而我的Cassandra集群无法处理,则会开始抛出错误,我希望所有写入都能成功地进入cassandra而没有任何损失。
我看到了这篇文章,其中解决方案是使用具有固定许可证数量的Semaphore
。但我不确定如何以及什么是实现最佳方式。我以前从未使用过Semaphore。这是逻辑。是否可以基于我的代码提供一个Semaphore示例或者是否有更好的方法/选项,请让我知道。
在编写数据加载程序的上下文中,您可以执行以下操作:
- 为了保持简单,使用具有固定许可证数量的Semaphore或其他构造(这将是您的最大飞行请求数)。每当您要使用executeAsync提交查询时,请获取许可证。您确实只需要1个线程(但可能需要引入一个大小为#CPU核心的池,以执行此操作),该线程从Semaphore获取许可证并执行查询。它将在获取时阻塞,直到有可用的许可证。
- 对从executeAsync返回的future使用Futures.addCallback。回调应在onSuccess和onFailure情况下都调用Semaphore.release()。通过释放许可证,这应该允许步骤1中的线程继续并提交下一个请求。
我还看到了其他一些文章,其中他们谈论了使用RingBuffer
或Guava RateLimitter
,那么哪个更好,我应该使用哪个?以下是我能想到的选项:
- 使用Semaphore
- 使用环形缓冲区
- 使用Guava速率限制器
是否可以帮助我提供一个示例,说明如何对cassandra写入进行请求限流或获得背压,并确保所有写入都成功地进入cassandra?