如何使用Guava RateLimiter以特定速率向Cassandra发送请求?

7
我正在使用DataStax Java Driver 3.1.0连接到Cassandra集群,我的Cassandra集群版本为2.0.10。我正在使用QUORUM一致性进行异步写入。
  private final ExecutorService executorService = Executors.newFixedThreadPool(10);
  private final Semaphore concurrentQueries = new Semaphore(1000);

  public void save(String process, int clientid, long deviceid) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      concurrentQueries.acquire();
      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          concurrentQueries.release();
          logger.logInfo("successfully written");
        }

        @Override
        public void onFailure(Throwable t) {
          concurrentQueries.release();
          logger.logError("error= ", t);
        }
      }, executorService);
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
  }

我的保存方法将会被多个线程以非常快的速度调用。如果我写入的速度比我的Cassandra集群处理能力快,那么它将开始抛出错误,并且我希望所有的写入都可以成功地进入Cassandra而没有任何损失。

问题:

我正在考虑使用某种类型的队列或缓冲区来排队请求(例如java.util.concurrent.ArrayBlockingQueue)。 "缓冲区已满" 将意味着客户端应该等待。缓冲区还将用于重新排队失败的请求。但是为了更公平,失败的请求可能应该放在队列的前面,以便首先重试它们。此外,我们还应该解决队列已满并且有新的失败请求的情况。然后,单线程工作程序将从队列中选择请求并将其发送到Cassandra。由于它不应该做太多事情,因此它不太可能成为瓶颈。这个工作者可以应用自己的速率限制,例如基于时间的com.google.common.util.concurrent.RateLimiter

实现这个队列或缓冲区功能并在写入Cassandra时应用特定的guava速率限制的最佳方法是什么?如果有更好的方法,请告诉我。我希望以每秒2000个请求写入Cassandra(这应该是可配置的,以便我可以尝试不同的设置来查看最佳设置)。

如下面的评论中所述,如果内存持续增加,我们可以使用Guava Cache或CLHM来删除旧记录,以确保我的程序不会耗尽内存。我们将在盒子上拥有大约12GB的内存,并且这些记录非常小,因此我认为这不应该是一个问题。


1
请提供您使用的实例和集群的一些信息,包括表创建语句以及访问模式的描述。您正在使用什么副本因子?通常情况下,Cassandra 的写入非常快速,即使在非常普通的集群上,您也可以超过 2000 个请求/秒。您还可以检查一下语句是否真正准备就绪,以及客户端是否由于某种原因每次都在准备语句?如果未实现缓冲,数据输入的速度是多少?我的直觉是您的 Cassandra 集群可能需要进行一些扩展。 - Marko Švaljek
我们在每个数据中心有三个节点,复制因子为3。在这张表上,我们将以非常高的速度进行写入,然后稍后我们将对其进行一些离线分析。是的,我会缓存准备好的语句,然后重复使用该准备好的语句。这些Cassandra集群设置不在我的控制范围内,因为我们公司的其他团队负责管理,所以我想确保至少我的代码不会失败,并且我们能够写入所有内容。 - john
我现在明白了。问题是,即使你使用内存作为缓冲区,在某些时候你可能会在负载下用尽它,所以即使你可以限制速率,实际上这可能是一个坏主意。你考虑过将这些消息放入某种队列中,例如kafka或sqs,然后通过一些简单的应用程序/进程将消息取出并以你可以轻松控制的速率推送到cassandra吗?这种模式非常好用。如果cassandra节点死亡,管理所有这些内容可能会让你陷入更大的麻烦。而且有了物理队列,你会更加安全。 - Marko Švaljek
@MarkoŠvaljek 我们已经在某些其他目的上使用Kafka或Zeromq,但是对于这个问题,我们没有看到任何需要将数据存储在消息队列中的必要性。我希望负载不会太大以至于我们会耗尽内存或出现其他问题。我只是想试一试这种方法,看看它的表现如何,然后与我拥有的其他方法进行比较,并根据此来决定我们需要做什么。 - john
让我们在聊天中继续这个讨论 - Marko Švaljek
显示剩余4条评论
1个回答

2
如果我以比我的Cassandra集群处理能力更高的速度进行写作,那么它就会开始抛出错误,而我希望所有的写作都能成功地进入Cassandra,没有任何损失。
Datastax驱动程序允许配置每个主机的连接数和每个连接的并发请求数量 (请参见PoolingOptions设置)
调整这些设置以减轻Cassandra集群的压力。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接