将Spark分析数据插入到Postgres中。

9

我有一个Cassandra数据库,通过Apache Spark使用SparkSQL分析了其中的数据。现在我想将这些分析后的数据插入到PostgreSQL中。除了使用PostgreSQL驱动程序(我已经使用postREST和Driver实现了),是否有其他直接实现的方法,例如saveToCassandra()

4个回答

12
目前没有原生实现将RDD写入任何DBMS的方法。以下是Spark用户列表中相关讨论的链接:onetwo
一般来说,最有效的方法如下:
  1. 验证RDD分区数的数量,不应该太低也不应该太高。20-50个分区应该是可以的,如果数量较低 - 使用20个分区调用repartition,如果数量较高 - 使用coalesce调整为50个分区
  2. 调用mapPartition转换,在其中调用使用JDBC向您的DBMS插入记录的函数。在此函数中,您打开到数据库的连接,并使用COPY命令及其API,它可以消除每个记录的单独命令的需求-这样插入将处理得更快
这样,您将以并行方式将数据插入Postgres中,利用多达50个并行连接(取决于Spark集群大小及其配置)。整个方法可能会实施为接受RDD和连接字符串的Java / Scala函数。

2
您可以使用Postgres的copy api来编写它,这样速度会更快。请参见以下两种方法-一种迭代RDD以填充可由copy api保存的缓冲区。您唯一需要注意的是创建正确的csv格式语句,该语句将由copy api使用。
def saveToDB(rdd: RDD[Iterable[EventModel]]): Unit = {
        val sb = mutable.StringBuilder.newBuilder
        val now = System.currentTimeMillis()

        rdd.collect().foreach(itr => {
            itr.foreach(_.createCSV(sb, now).append("\n"))
        })

        copyIn("myTable",  new StringReader(sb.toString), "statement")
        sb.clear
    }


def copyIn(tableName: String, reader: java.io.Reader, columnStmt: String = "") = {
        val conn = connectionPool.getConnection()
        try {
            conn.unwrap(classOf[PGConnection]).getCopyAPI.copyIn(s"COPY $tableName $columnStmt FROM STDIN WITH CSV", reader)
        } catch {
            case se: SQLException => logWarning(se.getMessage)
            case t: Throwable => logWarning(t.getMessage)
        } finally {
            conn.close()
        }
    }

StringBuilder缓冲区不会随着EventModel RDD中记录数量的增加而无限增长吗?为什么不会耗尽内存? - nont
我已经使用这个解决方案运行了几个月,到目前为止,我还没有看到它出现内存不足的情况。我拥有的数据量也相当大 - 每秒100000条。此外,如果您对此有疑虑,您可以根据另一个检查调用copyIn并清除缓冲区。 - smishra

1

0x0FFF的回答很好。这里还有一个有用的补充。

我使用foreachPartition将数据持久化到外部存储。这也符合Spark文档中给出的Design Patterns for using foreachRDD设计模式。 https://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#output-operations-on-dstreams

示例:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

1
上面的回答是关于旧版 Spark 的,而在 Spark 2.* 中有 JDBC 连接器,可以直接从 DataFrame 写入到 RDBS。
示例:
jdbcDF2.write.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接