我有一个Cassandra数据库,通过Apache Spark使用SparkSQL分析了其中的数据。现在我想将这些分析后的数据插入到PostgreSQL中。除了使用PostgreSQL驱动程序(我已经使用postREST和Driver实现了),是否有其他直接实现的方法,例如saveToCassandra()
?
我有一个Cassandra数据库,通过Apache Spark使用SparkSQL分析了其中的数据。现在我想将这些分析后的数据插入到PostgreSQL中。除了使用PostgreSQL驱动程序(我已经使用postREST和Driver实现了),是否有其他直接实现的方法,例如saveToCassandra()
?
repartition
,如果数量较高 - 使用coalesce
调整为50个分区mapPartition
转换,在其中调用使用JDBC向您的DBMS插入记录的函数。在此函数中,您打开到数据库的连接,并使用COPY命令及其API,它可以消除每个记录的单独命令的需求-这样插入将处理得更快def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = {
val sb = mutable.StringBuilder.newBuilder
val now = System.currentTimeMillis()
rdd.collect().foreach(itr => {
itr.foreach(_.createCSV(sb, now).append("\n"))
})
copyIn("myTable", new StringReader(sb.toString), "statement")
sb.clear
}
def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = {
val conn = connectionPool.getConnection()
try {
conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader)
} catch {
case se: SQLException => logWarning(se.getMessage)
case t: Throwable => logWarning(t.getMessage)
} finally {
conn.close()
}
}
0x0FFF的回答很好。这里还有一个有用的补充。
我使用foreachPartition
将数据持久化到外部存储。这也符合Spark文档中给出的Design Patterns for using foreachRDD
设计模式。
https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#output-operations-on-dstreams
示例:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
jdbcDF2.write.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html