Scala - 将Resultset转换为Spark Dataframe

3

我正在查询MySQL表。

val url = "jdbc:mysql://XXX-XX-XXX-XX-XX.compute-1.amazonaws.com:3306/pg_partner"
val driver = "com.mysql.jdbc.Driver"
val username = "XXX"
val password = "XXX"
var connection:Connection = DriverManager.getConnection(url, username, password)
val statement = connection.createStatement()
val patnerName = statement.executeQuery("SELECT id,name FROM partner")

我在partnerName中得到了结果,但需要将其转换为数据框(Dataframe)。

我可以通过以下代码打印数据:

while (patnerName.next) {
  val id = patnerName.getString("id")
  val name = patnerName.getString("name")
  println("id = %s, name = %s".format(id,name))
}

现在我如何将patnerName转换为DataFrame?
2个回答

5
所以,你需要分几步来完成:
  1. 定义你的列并准备一个模式
    val columns = Seq("id", "name")
    val schema = StructType(List(
      StructField("id", StringType, nullable = true),
      StructField("name", StringType, nullable = true)
    ))

定义如何在每次迭代中将ResultSet中的每个记录转换为行。
    def parseResultSet(rs: ResultSet): Row = {
      val resultSetRecord = columns.map(c => rs.getString(c))
      Row(resultSetRecord:_*)
    }
  1. 定义一个函数,将您的ResultSet转换为Iterator[Row]。在下一步调用它时,它将使用您在上一步中定义的函数。
    def resultSetToIter(rs: ResultSet)(f: ResultSet => Row): Iterator[Row] =
      new Iterator[Row] {
        def hasNext: Boolean = rs.next()
        def next(): Row = f(rs)
      }
  1. 定义一个函数,将Iterator[Row].toSeq转换成RDD,并使用前一步中定义的函数。使用模式创建一个DataFrame。
    def parallelizeResultSet(rs: ResultSet, spark: SparkSession): DataFrame = {
      val rdd = spark.sparkContext.parallelize(resultSetToIter(rs)(parseResultSet).toSeq)
      spark.createDataFrame(rdd, schema) // use the schema you defined in step 1
    }

最后调用您的函数。
    val df: DataFrame = parallelizeResultSet(patner, spark)

我尝试了上述方法,我能够创建数据框架,但它对我来说是创建空数据框架,没有来自我的结果集的数据。 - Saurabh
@Saurabh 你能验证ResultSet是否包含任何数据吗?尝试使用rs.next(),然后使用rs.getString("col_name")从当前行选择一列并查看是否返回数据。 - kfkhalili
是的,我使用rs.getString("col_name")验证了我的结果集中是否有数据。它已经按照我提供的结构创建了数据框架。 - Saurabh

3
直接使用Spark功能怎么样?
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:mysql://XXX-XX-XXX-XX-XX.compute-1.amazonaws.com:3306/")
  .option("dbtable", "pg_partner")
  .option("user", "XXX")
  .option("password", "XXX")
  .load()

这段代码取自这里

spark.read中的spark指的是什么?是SparkContext还是SQLContext - toofrellik
你正在使用哪个Spark版本?在Spark 2中出现了SparkSession - Anton Okolnychyi
我正在使用Spark 2.0.0,我的代码中使用了以下内容:val sparkConf = new SparkConf().setAppName("QuaterlyAudit").setMaster("local") val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.SQLContext(sc)sc无法工作,我知道sc是SparkContext,但是SparkSession是什么? - toofrellik
我得到了相同的错误代码:Exception in thread "main" java.sql.SQLException: No suitable driver - Tom Tang

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接