Cassandra集群插入性能差且插入稳定性不佳

7

我需要每个客户端每秒存储大约250个数字值,每小时大约为900k。这可能不会是全天记录(可能在5-10小时之间),但我将根据客户ID和读数日期对数据进行分区。最大行长度约为22-23M,仍然可管理。尽管如此,我的方案如下:

CREATE TABLE measurement (
  clientid text,
  date text,
  event_time timestamp,
  value int,
  PRIMARY KEY ((clientid,date), event_time)
);

这个 keyspace 的复制因子是 2,仅用于测试,snitch 是 GossipingPropertyFileSnitchNetworkTopologyStrategy。我知道复制因子 3 更符合生产标准。

接下来,在公司的服务器上创建了一个小集群,由三个裸机虚拟机组成,每个虚拟机都有 2 个 CPU x 2 核和 16GB 的 RAM,还有很多空间。我和它们在 gigabit LAN 中。该集群是基于 nodetool 运行的。

这是我用来测试我的设置的代码:

        Cluster cluster = Cluster.builder()
                .addContactPoint("192.168.1.100")
                .addContactPoint("192.168.1.102")
                .build();
        Session session = cluster.connect();
        DateTime time = DateTime.now();
        BlockingQueue<BatchStatement> queryQueue = new ArrayBlockingQueue(50, true);

    try {

        ExecutorService pool = Executors.newFixedThreadPool(15); //changed the pool size also to throttle inserts

        String insertQuery = "insert into keyspace.measurement (clientid,date,event_time,value) values (?, ?, ?, ?)";
        PreparedStatement preparedStatement = session.prepare(insertQuery);
        BatchStatement batch = new BatchStatement(BatchStatement.Type.LOGGED); //tried with unlogged also

        //generating the entries
        for (int i = 0; i < 900000; i++) { //900000 entries is an hour worth of measurements
            time = time.plus(4); //4ms between each entry
            BoundStatement bound = preparedStatement.bind("1", "2014-01-01", time.toDate(), 1); //value not important
            batch.add(bound);

            //The batch statement must have 65535 statements at most
            if (batch.size() >= 65534) {
                queryQueue.put(batch);
                batch = new BatchStatement();
            }
        }
        queryQueue.put(batch); //the last batch, perhaps shorter than 65535

        //storing the data
        System.out.println("Starting storing");
        while (!queryQueue.isEmpty()) {
            pool.execute(() -> {
                try {

                    long threadId = Thread.currentThread().getId();
                    System.out.println("Started: " + threadId);
                    BatchStatement statement = queryQueue.take();
                    long start2 = System.currentTimeMillis();
                    session.execute(statement);
                    System.out.println("Finished " + threadId + ": " + (System.currentTimeMillis() - start2));
                } catch (Exception ex) {
                    System.out.println(ex.toString());
                }
            });

        }
        pool.shutdown();
        pool.awaitTermination(120,TimeUnit.SECONDS);


    } catch (Exception ex) {
        System.out.println(ex.toString());
    } finally {
        session.close();
        cluster.close();
    }

我通过阅读这里和其他博客和网站的文章来编写代码。据我所知,对于客户端使用多个线程很重要,这就是我这样做的原因。我还尝试使用异步操作。
最终结果是不管我使用哪种方法,一个批次需要5-6秒才能执行完成,尽管可能需要长达10秒。如果我只输入一个批次(即仅约65k列),或者使用一个愚蠢的单线程应用程序,它需要相同的时间。老实说,我期望会更好一些。特别是因为我在本地实例上使用笔记本电脑时得到了更多或少相似的性能。
第二个,也许更重要的问题是,我面临着无法预测的异常情况。这两个是:
com.datastax.driver.core.exceptions.WriteTimeoutException:Cassandra在一致性ONE (1副本需要但只有0个确认写入)的写查询期间超时

com.datastax.driver.core.exceptions.NoHostAvailableException:所有尝试查询的主机都失败了(尝试过:/192.168.1.102:9042(com.datastax.driver.core.TransportException:[/192.168.1.102:9042]连接已关闭),/192.168.1.100:9042(com.datastax.driver.core.TransportException:[/192.168.1.100:9042]连接已关闭),/192.168.1.101:9042(com.datastax.driver.core.TransportException:[/192.168.1.101:9042]连接已关闭))
总的来说,我做错了什么吗?我应该重新组织加载数据的方式,或者改变方案。我尝试缩短行长度(所以我有12小时的行),但这没有什么大的区别。
更新:
我很粗鲁,忘记在回答问题后粘贴我使用的代码示例。它的效果还可以,但是我正在继续使用KairosDB和Astyanax进行二进制传输的研究。看起来,我可以通过它们获得比CQL更好的性能,尽管KairosDB在超载时可能会出现一些问题(但我正在解决此问题),而Astyanax对我来说有点冗长。尽管如此,这是代码,我可能在某个地方弄错了。当信号量槽号超过5000时,对性能没有影响,几乎是恒定的。
String insertQuery = "insert into keyspace.measurement     (userid,time_by_hour,time,value) values (?, ?, ?, ?)";
        PreparedStatement preparedStatement =     session.prepare(insertQuery);
        Semaphore semaphore = new Semaphore(15000);

    System.out.println("Starting " + Thread.currentThread().getId());
    DateTime time = DateTime.parse("2015-01-05T12:00:00");
    //generating the entries
    long start = System.currentTimeMillis();

    for (int i = 0; i < 900000; i++) { 

        BoundStatement statement = preparedStatement.bind("User1", "2015-01-05:" + time.hourOfDay().get(), time.toDate(), 500); //value not important
        semaphore.acquire();
        ResultSetFuture resultSetFuture = session.executeAsync(statement);
        Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
            @Override
            public void onSuccess(@Nullable com.datastax.driver.core.ResultSet resultSet) {

                semaphore.release();
            }

            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("Error: " + throwable.toString());
                semaphore.release();
            }
        });
        time = time.plus(4); //4ms between each entry
    }
1个回答

4

1
Spod是正确的。在Cassandra中的批处理并不是性能优化。只有在需要原子性且实现原子写入会带来性能惩罚时,才应使用已记录的批处理。即使是未记录的批处理通常也比直接异步查询更慢,因为它们本质上强制执行不必要的协调(除非您按键进行分批处理并使用基于标记的感知--也许您在这里)。我倾向于建议直接异步写入。这是另一篇支持这种观点的文章:http://lostechies.com/ryansvihla/2014/08/28/cassandra-batch-loading-without-the-batch-keyword/ - phact
1
关于超时问题,当您开始用太多的写入操作压倒您的c*节点时,这种情况就会发生。使用异步查询很容易出现这种情况,因为您的程序会不停地生成写入操作。在删除批处理(特别是日志记录)后,您应该会看到改进,但如果您的SLA允许,您可能需要限制写入操作或甚至增加超时时间。 - phact
1
总之,你是对的。所谓的网络开销(如果有的话?)不是使用批处理语句的理由。我已经回到了一个更简单的解决方案,使用异步操作,并且达到了基准测试中的数字,在一个3节点集群上大约为30k ops。我使用了信号量进行拥塞控制,我可以轻松地将其增加到10000个插槽,但我注意到性能上限要低得多(但仍然稳定)。我没有触及超时。此外,请确保您与服务器之间有可靠的有线连接!在这之前,我使用的是WiFi,那并不好玩。非常感谢你们两个。 - Aleksandar Stojadinovic
@sedovav,我已经添加了代码,请试用。当然,欢迎其他人查看,我可能会漏掉一些东西。 - Aleksandar Stojadinovic
如果在信号量大小为1的情况下失败,就像Stefan所说的那样,那么你可能有其他问题。我遇到GC超时限制的唯一情况是当我使用同步执行并且插入大小真的很大时。 - Aleksandar Stojadinovic
显示剩余6条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接