Cassandra:如何使用CQL高效地插入新的宽行

7
我正在评估Cassandra。我正在使用DataStax驱动程序和CQL。
我想要存储一些具有以下内部结构的数据,其中每次更新的名称都不同。
+-------+-------+-------+-------+-------+-------+
|       | name1 | name2 | name3 | ...   | nameN |
| time  +-------+-------+-------+-------+-------+
|       | val1  | val2  | val3  | ...   | valN  |
+-------+-------+-------+-------|-------+-------+

因此,时间应该是列键,名称应该是行键。我用的CQL语句创建这个表:

CREATE TABLE IF NOT EXISTS test.wide (
  time varchar,
  name varchar,
  value varchar,
  PRIMARY KEY (time,name))
  WITH COMPACT STORAGE

我希望模式是这样的,以便于查询。我偶尔需要存储超过65000行的更新。因此,使用cassandra列表/集合/映射数据类型不是一个选项。
我必须能够处理每秒至少1000个宽行插入,其中包含数量不定但很大(~1000)的名称/值对。
问题如下:我编写了一个简单的基准测试,每次进行1000个宽行插入,每个宽行包含10000个名称/值对。使用CQL和datastax驱动程序时,性能非常慢,而不使用CQL(使用astyanax)的版本在同一测试集群上具有良好的性能。
我已阅读related question,在该问题的被接受的答案中建议您应该能够通过使用批处理预准备语句(在cassandra 2中可用)原子且快速地创建新的宽行。
所以我尝试使用它们,但我仍然获得缓慢的性能(对于在本地主机上运行的小型三节点集群每秒两个插入)。我是否漏掉了一些显而易见的东西,还是必须使用较低级别的thrift API?我已经在astyanax中使用ColumnListMutation实现了相同的插入,并且每秒获得约30个插入。
如果我必须使用较低级别的thrift API:
- 它是否实际上已被弃用,还是因为它是较低级别而不便于使用? - 我能够使用CQL查询使用thrift api创建的表吗?
下面是一个包含在scala中的自包含代码示例。它只是创建了一个批处理语句,用于插入具有10000列的宽行,并重复计时插入性能。
我尝试了BatchStatement和一致性级别的选项,但没有任何方法可以让我获得更好的性能。
唯一的解释是,尽管批处理由准备好的语句组成,但条目仍逐一添加到行中。
package cassandra

import com.datastax.driver.core._

object CassandraTestMinimized extends App {

  val keyspace = "test"
  val table = "wide"
  val tableName = s"$keyspace.$table"

  def createKeyspace = s"""
CREATE KEYSPACE IF NOT EXISTS ${keyspace}
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
"""

  def createWideTable = s"""
CREATE TABLE IF NOT EXISTS ${tableName} (
time varchar,
name varchar,
value varchar,
PRIMARY KEY (time,name))
WITH COMPACT STORAGE
"""

  def writeTimeNameValue(time: String) = s"""
INSERT INTO ${tableName} (time, name, value)
VALUES ('$time', ?, ?)
"""

  val cluster = Cluster.builder.addContactPoints("127.0.0.1").build
  val session = cluster.connect()

  session.execute(createKeyspace)
  session.execute(createWideTable)

  for(i<-0 until 1000) {
    val entries =
      for {
        i <- 0 until 10000
        name = i.toString
        value = name
      } yield name -> value
    val batchPreparedStatement = writeMap(i, entries)
    val t0 = System.nanoTime()
    session.execute(batchPreparedStatement)
    val dt = System.nanoTime() - t0
    println(i + " " + (dt/1.0e9))
  }

  def writeMap(time: Long, update: Seq[(String, String)]) : BatchStatement = {
    val template = session
      .prepare(writeTimeNameValue(time.toString))
      .setConsistencyLevel(ConsistencyLevel.ONE)
    val batch = new BatchStatement(BatchStatement.Type.UNLOGGED)
    for ((k, v) <- update)
      batch.add(template.bind(k, v))
    batch
  }
}

以下是 astyanax 代码(修改自astyanax example),执行速度比原来快15倍。请注意,此代码也不使用异步调用,因此这是一个公平的比较。需要注意的是,这要求列族已经存在,因为我还没有弄清楚如何使用 astyanax 创建它,而示例中也没有任何创建列族的代码。

package cassandra;

import java.util.Iterator;

import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.serializers.AsciiSerializer;
import com.netflix.astyanax.serializers.LongSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;

public class AstClient {
    private static final Logger logger = LoggerFactory.getLogger(AstClient.class);

    private AstyanaxContext<Keyspace> context;
    private Keyspace keyspace;
    private ColumnFamily<Long, String> EMP_CF;
    private static final String EMP_CF_NAME = "employees2";

    public void init() {
        logger.debug("init()");

        context = new AstyanaxContext.Builder()
                .forCluster("Test Cluster")
                .forKeyspace("test1")
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                        .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
                )
                .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("MyConnectionPool")
                        .setPort(9160)
                        .setMaxConnsPerHost(1)
                        .setSeeds("127.0.0.1:9160")
                )
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                        .setCqlVersion("3.0.0")
                        .setTargetCassandraVersion("2.0.5"))
                .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
                .buildKeyspace(ThriftFamilyFactory.getInstance());

        context.start();
        keyspace = context.getClient();

        EMP_CF = ColumnFamily.newColumnFamily(
                EMP_CF_NAME,
                LongSerializer.get(),
                AsciiSerializer.get());
    }

    public void insert(long time) {
        MutationBatch m = keyspace.prepareMutationBatch();

        ColumnListMutation<String> x =
                m.withRow(EMP_CF, time);
        for(int i=0;i<10000;i++)
            x.putColumn(Integer.toString(i), Integer.toString(i));

        try {
            @SuppressWarnings("unused")
            Object result = m.execute();
        } catch (ConnectionException e) {
            logger.error("failed to write data to C*", e);
            throw new RuntimeException("failed to write data to C*", e);
        }
        logger.debug("insert ok");
    }

    public void createCF() {
    }

    public void read(long time) {
        OperationResult<ColumnList<String>> result;
        try {
            result = keyspace.prepareQuery(EMP_CF)
                    .getKey(time)
                    .execute();

            ColumnList<String> cols = result.getResult();
            // process data

            // a) iterate over columsn
            for (Iterator<Column<String>> i = cols.iterator(); i.hasNext(); ) {
                Column<String> c = i.next();
                String v = c.getStringValue();
                System.out.println(c.getName() + " " + v);
            }

        } catch (ConnectionException e) {
            logger.error("failed to read from C*", e);
            throw new RuntimeException("failed to read from C*", e);
        }
    }

    public static void main(String[] args) {
        AstClient c = new AstClient();
        c.init();
        long t00 = System.nanoTime();
        for(int i=0;i<1000;i++) {
            long t0 = System.nanoTime();
            c.insert(i);
            long dt = System.nanoTime() - t0;
            System.out.println((1.0e9/dt) + " " + i);
        }
        long dtt = System.nanoTime() - t00;

        c.read(0);
        System.out.println(dtt / 1e9);
    }

}

更新:我在cassandra-user邮件列表中找到了这个主题。当进行大型宽行插入时,使用CQL存在性能问题。有一个票CASSANDRA-6737来跟踪此问题。
更新2:我已经尝试了附加到CASSANDRA-6737的补丁,并确认该补丁完全修复了此问题。感谢DataStax的Sylvain Lebresne如此快速地解决了这个问题!
3个回答

8
你的代码中有一个错误,我认为这解释了你所看到的许多性能问题:对于每个批次,你都会重新准备语句。准备语句并不是非常昂贵的操作,但是像你这样做会增加很多延迟。等待语句准备的时间是你无法构建批次和Cassandra无法处理该批次的时间。一条准备好的语句只需要准备一次,然后就可以重复使用。
我认为大部分的性能问题都可以归结为延迟问题。瓶颈很可能是你的应用程序代码,而不是Cassandra。即使你只准备那个语句一次,你仍然会花费大部分时间在应用程序中被CPU绑定(构建大批次)或者什么也不做(等待网络和Cassandra)。
你可以做两件事情:首先使用CQL驱动程序的异步API,在网络和Cassandra忙于处理刚刚完成的批次时构建下一个批次;其次尝试运行多个线程执行相同的任务。确切的线程数需要进行实验,并且取决于你拥有的核心数量以及是否在同一台机器上运行一个或三个节点。
在同一台机器上运行三个节点的群集比单个节点更慢,而在不同的机器上运行则更快。此外,在同一台机器上运行应用程序并不能帮助提高性能。如果你想测试性能,要么只运行一个节点,要么在单独的机器上运行真正的集群。
批处理可以提供额外的性能,但并非总是如此。它们可能会导致你在测试代码中看到的问题:缓冲区膨胀。一旦批次变得太大,你的应用程序就会花费太多时间来构建它们,然后花费太多时间将它们推送到网络上,并且还需要等待Cassandra处理它们的时间。你需要尝试不同的批处理大小,并查看哪种效果最好(但要在真正的集群中进行测试,否则你无法看到网络的影响,这将是当你的批次变得更大时的一个重要因素)。
如果你使用批处理,请使用压缩。在大多数请求负载中,压缩没有任何区别(响应是另一回事),但是当你发送大量批次时,它可以产生很大的差异。
在Cassandra中,宽行写入没有什么特别之处。除了某些例外情况之外,模式不会改变处理写入所需的时间。我运行的应用程序每秒执行数万个非批量混合宽行和非宽行写入。集群不大,只有三到四个m1.xlarge的EC2节点。诀窍是在发送下一个请求之前永远不要等待上一个请求返回(这并不意味着“发射并忘记”,而是以同样的异步方式处理响应)。延迟是性能杀手。

我认为构建批处理不是罪魁祸首。但请注意,为了确保我不测量构建批处理的时间,计时措施仅涉及批处理的执行,而不涉及构建。我知道在本地主机上运行三个节点集群对性能来说并不是最好的选择。我也知道通过调整一些设置可以获得一些改进。但这些都无法解释为什么在同样在本地主机上运行的三个节点集群上使用Thrift API可以获得15倍的更好性能。 - Rüdiger Klaehn
我认为你正在测量错误的内容。重写测试代码以支持多线程,异步执行所有操作,并在几分钟内测量您通过的写入数量。仅测量批处理的延迟并不能充分反映实际性能。 - Theo
我已经修改了代码,使用压缩和executeAsync。但是没有任何区别。触发futures几乎是瞬间的,但是使用CQL进行1000次插入,每个插入有10000列,需要545秒,而非CQL版本(也修改为异步)只需要11秒 - Rüdiger Klaehn
1
我在我的机器上使用你的CQL进行了测试,使用Ruby驱动程序和Ruby运行,我可以在约1.5秒内运行一批10000个准备好的语句写入,但我也可以在约1.5秒内运行10000个异步请求。当你说你的1000批次的10000需要545秒时,你是只添加测试代码打印的测量值,还是整个运行时间?我假设是前者。 - Theo
使用异步时,单个时间基本上没有意义。因此,如果我说1000批次的10000需要545秒,我指的是直到整个1000批次完成的时间。 - Rüdiger Klaehn
显示剩余3条评论

5
你并不是唯一经历这种情况的人。我以前写过一篇博客文章,重点在于如何在CQL和thrift之间进行转换,但有链接指向邮件列表问题,涉及到相同的事情(宽行插入的性能问题是我最初调查的动机): http://thelastpickle.com/blog/2013/09/13/CQL3-to-Astyanax-Compatibility.html 总的来说,CQL非常适合那些刚接触Cassandra的人,因为它减轻了处理类型和理解数据库模型的负担。DataStax驱动程序编写得很好,并包含许多有用的功能。
然而,对于宽行插入,Thrift API要比CQL快得多。Netflix的博客没有深入探讨这个具体的用例。此外,只要人们在使用它(许多人在使用),Thrift API就不是遗留的。它是一个ASF项目,因此没有任何单一供应商运行它。
一般来说,对于任何基于Cassandra的应用程序,如果你找到一种满足或甚至超出工作负载性能要求的方法,请坚持使用它。

2
一些可以尝试的方法...在你的中(这是Cassandra 1.2.x,也许在2.x中参数的名称有些不同):
  • 禁用行缓存(row_cache_size_in_mb: 0
  • 在内存中的行溢出到磁盘之前增加内存限制(min_memory_compaction_limit_in_mb),只有在看到一些日志输出表明确实发生了溢出时才执行此操作
  • 确保正确配置num_tokens/initial_token值,以便将行分布在节点间
其他您可以尝试的事情:
  • 向客户端提供集群中所有节点的IP地址,而不仅仅是一个
  • 为每个Cassandra节点提供更多的RAM
  • 尝试运行多线程测试
  • 如果您在Linux上运行Cassandra,请确保已安装JNA并使用
需要澄清的事情:
  • 您是否通过nodetool确认3个节点已经找到对方?
  • nodetool如何显示您的3个节点的负载分配情况?
  • 您的虚拟群集的物理主机如何显示CPU和I/O使用情况?也许已经达到了最大值?

感谢调优技巧。但我认为这三个节点工作得很好。我进行了另一个测试,只插入了大型blob,性能也是可以接受的。Nodetool输出看起来也不错。我想我将不得不在thrift API中实现相同的基准测试,并查看其表现如何。 - Rüdiger Klaehn
我现在已经使用Astyanax thrift API(通过ColumnListMutation)基本上实现了相同的功能,并且我获得了15倍更好的性能。我很乐意使用较低级别的API。事实上,我更喜欢它比CQL更接近物理数据布局。但似乎不鼓励和弃用它的使用。 - Rüdiger Klaehn
根据Astyanax开发人员的说法,Thrift API仅略快于其他API http://techblog.netflix.com/2013/12/astyanax-update.html,因此必须有另一个原因导致这种情况。 - Alex Popescu
可能是我在使用预处理语句时做错了什么。但是是什么呢?这段代码非常简单。 - Rüdiger Klaehn

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接