如何使用Datastax Java驱动程序的异步/批量写入功能

8

我计划使用Datastax Java驱动程序来写入Cassandra。我主要对Datastax Java驱动程序的批量写入异步功能感兴趣,但是我找不到任何教程可以解释如何在下面使用Datastax Java驱动程序并实现这些功能。

/**
 * Performs an upsert of the specified attributes for the specified id.
 */
public void upsertAttributes(final String userId, final Map<String, String> attributes, final String columnFamily) {

    try {

        // make a sql here using the above input parameters.

        String sql = sqlPart1.toString()+sqlPart2.toString();

        DatastaxConnection.getInstance();
        PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(sql);
        prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);        

        BoundStatement query = prepStatement.bind(userId, attributes.values().toArray(new Object[attributes.size()]));

        DatastaxConnection.getSession().execute(query);

    } catch (InvalidQueryException e) {
        LOG.error("Invalid Query Exception in DatastaxClient::upsertAttributes "+e);
    } catch (Exception e) {
        LOG.error("Exception in DatastaxClient::upsertAttributes "+e);
    }
}

在下面的代码中,我正在使用Datastax Java驱动程序创建一个连接到Cassandra节点的连接。
/**
 * Creating Cassandra connection using Datastax Java driver
 *
 */
private DatastaxConnection() {

    try{
        builder = Cluster.builder();
        builder.addContactPoint("some_nodes");

        builder.poolingOptions().setCoreConnectionsPerHost(
                HostDistance.LOCAL,
                builder.poolingOptions().getMaxConnectionsPerHost(HostDistance.LOCAL));

        cluster = builder
                .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
                .withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
                .build();

        StringBuilder s = new StringBuilder();
        Set<Host> allHosts = cluster.getMetadata().getAllHosts();
        for (Host h : allHosts) {
            s.append("[");
            s.append(h.getDatacenter());
            s.append(h.getRack());
            s.append(h.getAddress());
            s.append("]");
        }
        System.out.println("Cassandra Cluster: " + s.toString());

        session = cluster.connect("testdatastaxks");

    } catch (NoHostAvailableException e) {
        e.printStackTrace();
        throw new RuntimeException(e);
    } catch (Exception e) {

    }
}

请问有没有人能够帮助我如何在上面的代码中添加批量写入或异步特性。谢谢你的帮助。

我正在运行Cassandra 1.2.9版本。

2个回答

8
对于异步操作,只需使用executeAsync函数即可:
...
DatastaxConnection.getSession().executeAsync(query);

对于批处理,您需要构建查询(我使用字符串,因为编译器知道如何优化字符串连接):

String cql =  "BEGIN BATCH "
       cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
       cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
       cql += "APPLY BATCH; "

DatastaxConnection.getInstance();
PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(cql);
prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);        

// this is where you need to be careful
// bind expects a comma separated list of values for all the params (?) above
// so for the above batch we need to supply 4 params:                     
BoundStatement query = prepStatement.bind(userId, "col1_val", userId_2, "col1_val_2");

DatastaxConnection.getSession().execute(query);

顺便提一下,我认为您的语句绑定可能类似于这样,假设您将属性更改为一个映射列表,其中每个映射表示批处理中的更新/插入:

BoundStatement query = prepStatement.bind(userId,
                                          attributesList.get(0).values().toArray(new Object[attributes.size()]), 
                                          userId_2,
                                          attributesList.get(1).values().toArray(new Object[attributes.size()])); 

有没有一种使用命名参数来完成这个的方法? - Highstead
1
@Highstead 用什么编程语言?上述代码是Java (有点像不是)。 - Lyuben Todorov
@Highstead Python = 是的,支持命名参数,示例在此,使用较新的Python DataStax驱动程序。 - Lyuben Todorov
这是一个客户端驱动程序,因此它是客户端侧的。您可以在自己的代码中完成此操作,然后将新数据推送到Cassandra服务器中。 - Lyuben Todorov
主要关注的是注入问题,还是这是我在Cassandra中需要负责的事情? - Highstead
显示剩余5条评论

6

对于Lyuben的回答中提供的示例,如果您需要更新计数器,使用字符串设置批处理的某些属性,例如Type.COUNTER将不起作用。相反,您可以按以下方式安排批处理的预准备语句:

final String insertQuery = "INSERT INTO test.prepared (id, col_1) VALUES (?,?);";
final PreparedStatement prepared = session.prepare(insertQuery);

final BatchStatement batch = new BatchStatement(BatchStatement.Type.UNLOGGED);
batch.add(prepared.bind(userId1, "something"));
batch.add(prepared.bind(userId2, "another"));
batch.add(prepared.bind(userId3, "thing"));

session.executeAsync(batch);

1
我喜欢这个比被接受的答案更好。在这里,批处理的内容可以是动态的(与被接受的答案中固定的CQL和参数数量不同)。 - 0cd
我认为这是糟糕的代码(截至2019年)。BatchStatement是不可变的。你需要批量添加= batch.add(...)。 - Tony Schwartz

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接