如何将RDD[Row]转换回DataFrame?

11

我一直在尝试将RDD转换为DataFrame,然后再转回来。首先,我有一个类型为(Int, Int)的RDD,名为dataPair。然后,我使用以下代码创建了一个带有列标题的DataFrame对象:

val dataFrame = dataPair.toDF(header(0), header(1))

然后我使用以下代码将它从DataFrame转换回RDD:

val testRDD = dataFrame.rdd

使用.toDF将返回类型为org.apache.spark.sql.Row的RDD重新转换为RDD, 而不是(Int, Int)类型。但是,我尝试使用.toDF将其转换回RDD时出现了错误:

error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

我尝试为testRDD定义了一个类型为Data(Int, Int)的模式,但是遇到了类型不匹配的异常:

error: type mismatch;
found   : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
required: org.apache.spark.rdd.RDD[Data]
    val testRDD: RDD[Data] = dataFrame.rdd
                                       ^

我已经导入了

import sqlContext.implicits._
1个回答

25
要从一组行的RDD创建DataFrame,通常有两种主要方法:
1)您可以使用 toDF(),可以通过 import sqlContext.implicits._ 导入。但是,此方法仅适用于以下类型的RDD:
  • RDD[Int]
  • RDD[Long]
  • RDD[String]
  • RDD[T <: scala.Product]
(来源:SQLContext.implicits对象的Scaladoc
实际上,最后一个签名意味着它可以用于元组的RDD或案例类的RDD(因为元组和案例类是scala.Product的子类)。
因此,要将此方法用于RDD [Row] ,您必须将其映射到 RDD [T <: scala.Product] 。这可以通过将每行映射到自定义案例类或元组来完成,如以下代码片段所示:
val df = rdd.map({ 
  case Row(val1: String, ..., valN: Long) => (val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")

或者

case class MyClass(val1: String, ..., valN: Long = 0L)
val df = rdd.map({ 
  case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")

这种方法的主要缺点(在我看来)是你必须在map函数中逐列显式设置生成DataFrame的模式。如果事先不知道模式,也许可以通过程序自动化完成,但那里可能会有一些混乱。因此,另有一种选择:

2)你可以使用createDataFrame(rowRDD: RDD[Row], schema: StructType),这个方法在SQLContext对象中可用。示例:

val df = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)

请注意,没有必要显式设置任何模式列。我们重用旧的DF模式,它是StructType类,并且可以很容易地扩展。但是,这种方法有时不可行,在某些情况下可能比第一种方法效率低。
我希望现在比之前更清楚了。干杯。

1
似乎不再起作用了。 - thebluephantom

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接