Spark Cassandra Connector正确使用方法

4
我想使用Spark进行一些ETL操作,主要包括“更新”语句(列是集合,需要追加内容,因此简单的插入可能行不通)。因此,似乎发出CQL查询以导入数据是最好的选择。使用Spark Cassandra 连接器,我看到可以这样做:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra 现在我不想为源中的每一行打开并关闭一个会话(不想这样做是否正确?通常,我在“正常”的应用程序中为整个处理过程使用一个会话并继续使用它)。但是,它说连接器是可序列化的,但会话显然不是。因此,将整个导入程序包装在单个“withSessionDo”内似乎会引起问题。我考虑使用类似于这样的东西:
class CassandraStorage(conf:SparkConf) {
  val session = CassandraConnector(conf).openSession()
  def store (t:Thingy) : Unit = {
    //session.execute cql goes here
  }
}

这是一个好方法吗?我需要担心关闭会话吗?在哪里/怎样最好地做到这一点? 欢迎任何指导。


我在想,为什么你不想创建Spark Conf对象并在Spark上下文中引用它,就像连接器页面上方的示例所显示的那样呢?你应该只需要创建conf对象,然后在运行查询时保持上下文打开状态。 - markc
1个回答

4

你确实需要使用withSessionDo,因为它不会在每次访问时打开和关闭会话。在幕后,withSessionDo访问JVM级别的会话。这意味着您每个节点的每个集群配置只有一个会话对象。

这意味着像这样的代码:

val connector = CassandraConnector(sc.getConf)
sc.parallelize(1 to 10000000L).map(connector.withSessionDo( Session => stuff)

无论每台机器有多少个核心,每个执行器 JVM 只会创建一个集群和会话对象。

为了提高效率,我仍然建议使用 mapPartitions 来最小化缓存检查。

sc.parallelize(1 to 10000000L)
  .mapPartitions(it => connector.withSessionDo( session => 
      it.map( row => do stuff here )))

此外,会话对象还使用一个准备缓存,它允许您在序列化代码中缓存一个准备好的语句,并且只会在每个 JVM 中预备一次(所有其他调用都将返回缓存引用)。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接