如何在使用“executeAsync”时限制写入请求到Cassandra?

18
我正在使用Datastax Java驱动程序3.1.0连接到Cassandra集群,我的Cassandra集群版本是2.0.10。我正在使用QUORUM一致性异步写入数据。
  private final ExecutorService executorService = Executors.newFixedThreadPool(10);

  public void save(String process, int clientid, long deviceid) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
        }
      }, executorService);
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
  }

我的上述保存方法将从多个线程以非常快的速度调用。

问题:

我想对executeAsync方法进行请求限流,它会异步写入Cassandra。如果我以非常高的速度写入,而我的Cassandra集群无法处理,则会开始抛出错误,我希望所有写入都能成功地进入cassandra而没有任何损失。

我看到了这篇文章,其中解决方案是使用具有固定许可证数量的Semaphore。但我不确定如何以及什么是实现最佳方式。我以前从未使用过Semaphore。这是逻辑。是否可以基于我的代码提供一个Semaphore示例或者是否有更好的方法/选项,请让我知道。

在编写数据加载程序的上下文中,您可以执行以下操作:

  • 为了保持简单,使用具有固定许可证数量的Semaphore或其他构造(这将是您的最大飞行请求数)。每当您要使用executeAsync提交查询时,请获取许可证。您确实只需要1个线程(但可能需要引入一个大小为#CPU核心的池,以执行此操作),该线程从Semaphore获取许可证并执行查询。它将在获取时阻塞,直到有可用的许可证。
  • 对从executeAsync返回的future使用Futures.addCallback。回调应在onSuccess和onFailure情况下都调用Semaphore.release()。通过释放许可证,这应该允许步骤1中的线程继续并提交下一个请求。

我还看到了其他一些文章,其中他们谈论了使用RingBufferGuava RateLimitter,那么哪个更好,我应该使用哪个?以下是我能想到的选项:

  • 使用Semaphore
  • 使用环形缓冲区
  • 使用Guava速率限制器

是否可以帮助我提供一个示例,说明如何对cassandra写入进行请求限流或获得背压,并确保所有写入都成功地进入cassandra?

2个回答

11
不是权威的答案,但也许会有所帮助。首先,你应该考虑当查询无法立即执行时该怎么办。无论你选择哪种速率限制,如果请求的速率高于你能够写入Cassandra的速率,最终你的进程将被等待请求堵塞。在那个时候,你需要告诉客户端暂时保持他们的请求("推迟")。例如,如果它们通过HTTP发送,则响应状态将为429 "Too Many Requests"。如果你在同一个进程中生成请求,那么决定可接受的最长超时时间。也就是说,如果Cassandra跟不上,那么就是时候扩展(或调整)它了。
在实施速率限制之前,也许值得进行一些实验,在调用save方法之前给线程添加人为延迟(例如使用Thread.sleep(...)),看看是否确实是你的问题,或者是否需要其他解决方案。
查询返回错误是Cassandra的反压力。但是你可以选择或实现RetryPolicy来确定何时重试失败的查询。

你还可以查看连接池选项(特别是监控和调优池)。可以调整每个连接的异步请求数量。然而文档表示对于Cassandra 2.x,该参数最高为128,不应更改(但我会尝试一下:)

使用Semaphore的实现如下:

/* Share it among all threads or associate with a thread for per-thread limits
   Number of permits is to be tuned depending on acceptable load.
*/
final Semaphore queryPermits = new Semaphore(20); 


public void save(String process, int clientid, long deviceid) {
  ....
  queryPermits.acquire(); // Blocks until a permit is available

  ResultSetFuture future = session.executeAsync(bs);
  Futures.addCallback(future, new FutureCallback<ResultSet>() {
    @Override
    public void onSuccess(ResultSet result) {
      queryPermits.release();
      logger.logInfo("successfully written");
    }
    @Override
    public void onFailure(Throwable t) {
      queryPermits.release(); // Permit should be released in all cases.
      logger.logError("error= ", t);
    }
  }, executorService);
  ....
}

(在真实的代码中,我会创建一个装饰器来调用包装的方法,然后释放许可。)

Guava的RateLimiter类似于信号量,但允许在低利用率期间进行临时突发,并根据时间限制请求(而不是活动查询的总数)。

然而,由于各种原因,请求可能仍然失败,所以最好有一个重试它们的计划(以防断断续续的错误)。

在您的情况下可能不合适,但我建议尝试使用一些队列或缓冲区来排队请求(例如java.util.concurrent.ArrayBlockingQueue)。"缓冲区已满"意味着客户端应该等待或放弃请求。缓冲区还将用于重新添加失败的请求。然而,为了更公平,失败的请求可能应该放在队列的前面,以便首先重试它们。此外,还应该想办法处理队列已满并且同时有新的失败请求的情况。然后,单线程的工作程序将从队列中选择请求并将其发送到Cassandra。由于它不需要做太多事情,很少会成为瓶颈。该工作程序还可以应用自己的速率限制,例如基于时间使用com.google.common.util.concurrent.RateLimiter

如果有人想尽可能避免丢失消息,他们可以在Cassandra之前放置一个带持久性的消息代理(例如Kafka)。这样,即使Cassandra长时间宕机,传入的消息也能够存活下来。但是,我猜你的情况下这可能有点过度了。

你认为你能给我提供一个关于队列或缓冲区的示例吗?我认为这会是在我的场景中最适合的。 - john
我知道这是一个老问题,但我偶然发现了它,并且最新的最佳实践在这里记录:https://github.com/DataStax-Examples/concurrent-requests-java - rd22

3

只需使用阻塞队列即可完成。 Futures是线程化的,它们的回调(成功和失败)将充当消费者,而您从哪里调用保存方法将充当生产者。

更好的方法是将完整请求放入队列中,逐个出队并在每次出队时触发保存。

private final ExecutorService executorService = Executors.newFixedThreadPool(10);

public void save(String process, int clientid, long deviceid, BlockingQueue<Object> queue) {
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
    try {
      BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
      bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
      bs.setString(0, process);
      bs.setInt(1, clientid);
      bs.setLong(2, deviceid);

      ResultSetFuture future = session.executeAsync(bs);
      Futures.addCallback(future, new FutureCallback<ResultSet>() {
        @Override
        public void onSuccess(ResultSet result) {
          logger.logInfo("successfully written");
          queue.take();
        }

        @Override
        public void onFailure(Throwable t) {
          logger.logError("error= ", t);
          queue.take();
        }
      }, executorService);
    } catch (Exception ex) {
      logger.logError("error= ", ex);
    }
}

public void invokeSaveInLoop(){
    Object dummyObj = new Object();
    BlockingQueue<Object> queue = new ArrayBlockingQueue<>(20);;
    for(int i=0; i< 1000; i++){
        save("process", clientid, deviceid, queue);
        queue.put(dummyObj);
    }
}

如果您想进一步检查集群中途的负载,请按如下步骤操作:
public static String getCurrentState(){    
StringBuilder response = new StringBuilder();
            response.append("Current Database Connection Status <br>\n ---------------------------------------------<br>\n");
            final LoadBalancingPolicy loadBalancingPolicy =
                    cluster.getConfiguration().getPolicies().getLoadBalancingPolicy();
            final PoolingOptions poolingOptions =
                    cluster.getConfiguration().getPoolingOptions();
            Session.State state = session.getState();
            for (Host host : state.getConnectedHosts()) {
                HostDistance distance = loadBalancingPolicy.distance(host);
                int connections = state.getOpenConnections(host);
                int inFlightQueries = state.getInFlightQueries(host);
                response.append(String.format("%s current connections=%d, max allowed connections=%d, current load=%d, max load=%d%n",
                                host, connections, poolingOptions.getMaxConnectionsPerHost(distance), inFlightQueries,
                                connections *
                                        poolingOptions.getMaxRequestsPerConnection(distance)))
                        .append("<br>\n");
            }
            return response.toString();
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接