在Spark/Scala中将RDD转换为Dataframe

6

RDD已经以Array[Array[String]]格式创建,并具有以下值:

val rdd : Array[Array[String]] = Array(
Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
Array("4580056797", "0", "2015-07-29 10:38:43", "0", "1", "1"))

我想创建一个带有以下模式的dataFrame:

val schemaString = "callId oCallId callTime duration calltype swId"

接下来的步骤:

scala> val rowRDD = rdd.map(p => Array(p(0), p(1), p(2),p(3),p(4),p(5).trim))
rowRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[14] at map at <console>:39
scala> val calDF = sqlContext.createDataFrame(rowRDD, schema)

给出以下错误:
console:45: error: overloaded method value createDataFrame with alternatives:
     (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
    (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
    (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
    (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
    cannot be applied to (org.apache.spark.rdd.RDD[Array[String]],   
    org.apache.spark.sql.types.StructType)
       val calDF = sqlContext.createDataFrame(rowRDD, schema)
4个回答

12

只需粘贴到 spark-shell 中即可:

val a = 
  Array(
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))

val rdd = sc.makeRDD(a)

case class X(callId: String, oCallId: String, 
  callTime: String, duration: String, calltype: String, swId: String)

然后对RDD使用map()创建案例类的实例,然后使用toDF()创建DataFrame:

scala> val df = rdd.map { 
  case Array(s0, s1, s2, s3, s4, s5) => X(s0, s1, s2, s3, s4, s5) }.toDF()
df: org.apache.spark.sql.DataFrame = 
  [callId: string, oCallId: string, callTime: string, 
    duration: string, calltype: string, swId: string]

这将从样例类中推断出模式。

然后你可以继续进行:

scala> df.printSchema()
root
 |-- callId: string (nullable = true)
 |-- oCallId: string (nullable = true)
 |-- callTime: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- calltype: string (nullable = true)
 |-- swId: string (nullable = true)

scala> df.show()
+----------+-------+-------------------+--------+--------+----+
|    callId|oCallId|           callTime|duration|calltype|swId|
+----------+-------+-------------------+--------+--------+----+
|4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
|4580056797|      0|2015-07-29 10:38:42|       0|       1|   1|
+----------+-------+-------------------+--------+--------+----+

如果您想在普通程序(而不是在spark-shell中)使用toDF(),请确保(引用自此处):

  • 在创建SQLContext之后立即导入sqlContext.implicits._
  • 在使用toDF()的方法外定义case类

4
你需要先将你的 Array 转换成 Row,然后再定义模式。我假设大多数字段都是 Long 类型。
    val rdd: RDD[Array[String]] = ???
    val rows: RDD[Row] = rdd map {
      case Array(callId, oCallId, callTime, duration, swId) =>
        Row(callId.toLong, oCallId.toLong, callTime, duration.toLong, swId.toLong)
    }

    object schema {
      val callId = StructField("callId", LongType)
      val oCallId = StructField("oCallId", StringType)
      val callTime = StructField("callTime", StringType)
      val duration = StructField("duration", LongType)
      val swId = StructField("swId", LongType)

      val struct = StructType(Array(callId, oCallId, callTime, duration, swId))
    }

    sqlContext.createDataFrame(rows, schema.struct)

2

使用 spark 1.6.1scala 2.10

我遇到了相同的错误 error: overloaded method value createDataFrame with alternatives:

对我而言,问题出在createDataFrame的签名上,我试图使用 val rdd : List[Row],但它失败了,因为java.util.List[org.apache.spark.sql.Row]scala.collection.immutable.List[org.apache.spark.sql.Row]是不同的。

我发现的可行解决方案是将val rdd : Array[Array[String]]通过List[Array[String]]转换为RDD[Row]。我认为这是最接近文档的方法。

最初的回答:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType,StructField,StringType};
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val rdd_original : Array[Array[String]] = Array(
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"), 
    Array("4580056797", "0", "2015-07-29 10:38:42", "0", "1", "1"))

val rdd : List[Array[String]] = rdd_original.toList

val schemaString = "callId oCallId callTime duration calltype swId"

// Generate the schema based on the string of schema
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD to Rows.
val rowRDD = rdd.map(p => Row(p: _*)) // using splat is easier
// val rowRDD = rdd.map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5))) // this also works

val df = sqlContext.createDataFrame(sc.parallelize(rowRDD:List[Row]), schema)
df.show

1
我假设你的模式Spark指南中的类似,如下所示:
val schema =
  StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

如果您查看createDataFrame的签名,这是接受StructType作为第二个参数(对于Scala)的签名。
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame 使用给定的模式从包含行的RDD创建DataFrame。
因此,它将RDD [Row]作为第一个参数接受。您在rowRDD中拥有的是RDD [Array [String]],因此存在不匹配。
您需要一个RDD [Array [String]]吗?
否则,您可以使用以下内容来创建数据框:
val rowRDD = rdd.map(p => Row(p(0), p(1), p(2),p(3),p(4),p(5).trim))

如果RDD中的某个p元素具有null值,并且在转换为Row类型之前需要进行toInt操作,该怎么办: val rowRDD = rdd.map(p => Row(p(0), p(1).toInt, p(2), p(3).toFloat, p(4), p(5).trim)) 但是原始的rdd中p(1)和p(3)都具有null值。这会导致失败,因此我不得不删除toInt和toFloat,但这并没有帮助,因为我的模式定义需要int和float。 - Steven Park

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接