在Spark中访问数组列

11

一个 Spark DataFrame 包含一个类型为 Array[Double] 的列。当我试图在 map() 函数中获取它时,它会抛出 ClassCastException 异常。以下 Scala 代码会生成一个异常。

case class Dummy( x:Array[Double] )
val df = sqlContext.createDataFrame(Seq(Dummy(Array(1,2,3))))
val s = df.map( r => {
   val arr:Array[Double] = r.getAs[Array[Double]]("x")
   arr.sum
})
s.foreach(println)

例外情况是

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [D
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:24)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:23)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

谁能给我解释一下为什么它不起作用?我该怎么做呢? 我正在使用Spark 1.5.1和scala 2.10.6

谢谢

2个回答

25

ArrayTypeRow 中以 scala.collection.mutable.WrappedArray 的形式表示。你可以使用以下方法进行提取

val arr: Seq[Double] = r.getAs[Seq[Double]]("x")

或者
val i: Int = ???
val arr = r.getSeq[Double](i)

甚至更多:

import scala.collection.mutable.WrappedArray

val arr: WrappedArray[Double] = r.getAs[WrappedArray[Double]]("x")

如果DataFrame相对较小,那么模式匹配可能是更好的方法:
import org.apache.spark.sql.Row

df.rdd.map{case Row(x: Seq[Double]) => (x.toArray, x.sum)}

需要注意的是,该序列的类型未经检查。

在 Spark >= 1.6 中,您还可以使用以下 Dataset

df.select("x").as[Seq[Double]].rdd

0

这种方法也可以被考虑:

  val tuples = Seq(("Abhishek", "Sengupta", Seq("MATH", "PHYSICS")))
  val dF = tuples.toDF("firstName", "lastName", "subjects")

  case class StudentInfo(fName: String, lName: String, subjects: Seq[String])

  val students = dF
    .collect()
    .map(row => StudentInfo(row.getString(0), row.getString(1), row.getSeq(2)))

  students.foreach(println)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接