如何使用DataStax Java驱动程序在Cassandra中高效地使用预编译语句?

8

我需要使用Datastax Java驱动程序查询Cassandra中的其中一个表。以下是我拥有的代码,可以正常工作 -

public class TestCassandra {

        private Session session = null;
        private Cluster cluster = null;

        private static class ConnectionHolder {
            static final TestCassandra connection = new TestCassandra();
        }

        public static TestCassandra getInstance() {
            return ConnectionHolder.connection;
        }

        private TestCassandra() {
            Builder builder = Cluster.builder();
            builder.addContactPoints("127.0.0.1");

            PoolingOptions opts = new PoolingOptions();
            opts.setCoreConnectionsPerHost(HostDistance.LOCAL, opts.getCoreConnectionsPerHost(HostDistance.LOCAL));

            cluster = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE).withPoolingOptions(opts)
                    .withLoadBalancingPolicy(new TokenAwarePolicy(new DCAwareRoundRobinPolicy("DC2")))
                    .withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
                    .build();
            session = cluster.connect();
        }

    private Set<String> getRandomUsers() {
        Set<String> userList = new HashSet<String>();

        for (int table = 0; table < 14; table++) {
            String sql = "select * from testkeyspace.test_table_" + table + ";";

            try {
                SimpleStatement query = new SimpleStatement(sql);
                query.setConsistencyLevel(ConsistencyLevel.QUORUM);
                ResultSet res = session.execute(query);

                Iterator<Row> rows = res.iterator();
                while (rows.hasNext()) {
                    Row r = rows.next();

                    String user_id = r.getString("user_id");
                    userList.add(user_id);
                }
            } catch (Exception e) {
                System.out.println("error= " + ExceptionUtils.getStackTrace(e));
            }
        }

        return userList;
    }
}

我在我的主要应用程序中使用上述类似这样的类 -
TestCassandra.getInstance().getRandomUsers();

有没有一种方法可以有效地在 getRandomUsers 中使用 PreparedStatement ?我猜我需要确保只创建一次 PreparedStatement 而不是多次创建它。在我的当前架构中,最好的设计是什么,我该如何使用它?

2个回答

16

您可以创建一个缓存(这是一个相当基本的示例,以便让您有一个想法)来保存您需要的语句。让我们从创建将用作缓存的类开始。

private class StatementCache {
    Map<String, PreparedStatement> statementCache = new HashMap<>();
    public BoundStatement getStatement(String cql) {
        PreparedStatement ps = statementCache.get(cql);
        // no statement cached, create one and cache it now.
        if (ps == null) {
            ps = session.prepare(cql);
            statementCache.put(cql, ps);
        }
        return ps.bind();
    }
}

然后将一个实例添加到您的单例中:

public class TestCassandra {
    private Session session = null;
    private Cluster cluster = null;
    private StatementCache psCache = new StatementCache();
    // rest of class...

最后从您的函数中使用缓存:

private Set<String> getRandomUsers(String cql) {
// lots of code.    
        try {
            SimpleStatement query = new SimpleStatement(cql);
            query.setConsistencyLevel(ConsistencyLevel.QUORUM);
            // abstract the handling of the cache to it's own class.
            // this will need some work to make sure it's thread safe
            // as currently it's not.
            ResultSet res = session.execute(psCache.getStatement(cql));

@david 实际上,我会为每个n个线程设置一个缓存(您需要计算出何时地图的获取变得太昂贵,从而意味着需要新的共享缓存)。尝试像每5个线程1个缓存这样的东西。甚至每个线程都应该有自己的ps缓存(基于其中有多少语句,语句越多,所需的缓存就越少)。 - Lyuben Todorov
例子在混淆成问题时最为有效。假设您有2000个线程执行SELECT操作。这意味着您将与2000个线程共享1个缓存。从性能角度来看,这是一个糟糕的想法。相反,对于每n个线程,让我们具体点,比如说每20个线程,您可以共享一个缓存。这意味着您将拥有2000/20 = 100个准备好的语句缓存。如何做到这一点?您需要将线程集和它们各自的缓存进行映射。我将把实现留给您。还要注意,这些数字是随机的,需要进行负载测试以找到最佳缓存比例。 - Lyuben Todorov
@LyubenTodorov 但是当这些线程是Servlet请求时,该如何处理呢? - pinkpanther
@lining 是一个缓存实例,它被添加到单例中,因此它不是每个线程的缓存,而是一个单例缓存。这是为演示目的而快速编写的,生产应用程序可能需要一个正确构建的单例缓存。 - Lyuben Todorov
1
线程的缓存并不是必需的。"您应该只准备一次,并在应用程序中缓存PreparedStatement(它是线程安全的)。如果您使用相同的查询字符串多次调用prepare,则驱动程序将记录警告。" https://docs.datastax.com/en/developer/java-driver/3.0/manual/statements/prepared/ - Alisson Gomes
显示剩余3条评论

1

我的实现与上面分享的大致相同,但具有性能检查和处理竞争条件的实现。请查看代码中的内联注释以了解我的思路。

 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
 import nl.ing.creditcards.commons.activity.ActivityException;

 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;

public class StatementCache {

/* prevent cache incoherence issues*/
private static volatile StatementCache sCacheInstance;
private static final Map<String, PreparedStatement> holder = new ConcurrentHashMap<>();
private static final String NOT_PERMITTED = "Operation not permitted";

private StatementCache() {
    /*Prevent access through reflection api.*/
    if (sCacheInstance != null) {
        throw new ActivityException(NOT_PERMITTED, "Use getInstance() to retrieve the instance of this class");
    }
}

/**
 * Double check locking pattern usage for singleton classes
 *
 * @return
 */
public static StatementCache getInstance() {
    if (sCacheInstance == null) { //Check for the first time
        synchronized (StatementCache.class) { // second check in order to keep the operation atomic
            if (sCacheInstance == null) sCacheInstance = new StatementCache();
        }
    }
    return sCacheInstance;
}

/**
 * If {@link StatementCache#getStatement#prepared_statement} is already present in cache,
 * then we don't have to synchronize and make threads wait, otherwise, we synchronize the caching bit.
 *
 * @param session
 * @param cql
 * @return
 */
public PreparedStatement getStatement(Session session, String cql) {
    PreparedStatement prepared_statement = holder.get(cql);
    if (prepared_statement == null) {
        synchronized (this) {
            prepared_statement = holder.get(cql);
            if (prepared_statement == null) {
                prepared_statement = session.prepare(cql);
                holder.put(cql, prepared_statement);
            }
        }
    }
    return prepared_statement;
  }
}

使用这个缓存单例类就像这样简单:
public class CacheConsumer{

    private static Session session;

    CacheConsumer(Session session){
     this.session=session;
   }

    public void someMethod(){
      String cqlstatement = "SELECT * FROM SOME_TABLE";
      PreparedStatement statement= 
       StatementCache.getInstance().getStatement(session,cqlstatement);
         // You can now use the prepared statement however you wish.
   }
}

相当简单 ;)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接