我想使用Spark进行一些ETL操作,主要包括“更新”语句(列是集合,需要追加内容,因此简单的插入可能行不通)。因此,似乎发出CQL查询以导入数据是最好的选择。使用Spark Cassandra 连接器,我看到可以这样做:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/1_connecting.md#connecting-manually-to-cassandra
现在我不想为源中的每一行打开并关闭一个会话(不想这样做是否正确?通常,我在“正常”的应用程序中为整个处理过程使用一个会话并继续使用它)。但是,它说连接器是可序列化的,但会话显然不是。因此,将整个导入程序包装在单个“withSessionDo”内似乎会引起问题。我考虑使用类似于这样的东西:
class CassandraStorage(conf:SparkConf) {
val session = CassandraConnector(conf).openSession()
def store (t:Thingy) : Unit = {
//session.execute cql goes here
}
}
这是一个好方法吗?我需要担心关闭会话吗?在哪里/怎样最好地做到这一点? 欢迎任何指导。