将Spark DataFrame转换为RDD并返回

3

我正在使用Scala编写一个Apache Spark应用程序。为了处理和存储数据,我使用DataFrames。我有一个不错的流水线,包括特征提取和MultiLayerPerceptron分类器,使用ML API。

我还想使用SVM(为了比较目的)。问题是(如果我说错了请纠正),只有MLLib提供SVM。而MLLib还没有准备好处理DataFrames,只能处理RDDs。

因此,我认为可以使用DataFrames维护我的应用程序核心,并使用SVM来1)仅将我需要的DataFrame列转换为RDD [LabeledPoint],并且2)在分类后将SVM预测添加到DataFrame作为新列。

第一部分我使用了一个小函数来完成:

private def dataFrameToRDD(dataFrame : DataFrame) : RDD[LabeledPoint] = {
    val rddMl = dataFrame.select("label", "features").rdd.map(r => (r.getInt(0).toDouble, r.getAs[org.apache.spark.ml.linalg.SparseVector](1)))
    rddMl.map(r => new LabeledPoint(r._1, Vectors.dense(r._2.toArray)))
}

由于特征提取方法使用的是ML API而不是MLLib,因此我必须指定和转换向量的类型。

然后,将这个RDD[LabeledPoint]输入SVM进行分类,一切顺利,没有问题。最后,按照Spark的示例,我得到了一个RDD[Double]

val predictions = rdd.map(point => model.predict(point.features))

现在,我想将预测分数作为列添加到原始DataFrame中并返回它。这就是我卡住的地方。我可以使用RDD[Double]将其转换为DataFrame
(sql context ommited)
import sqlContext.implicits._
val plDF = predictions.toDF("prediction")

但是如何将第二个DataFrame作为原始DataFrame的一列加入其中呢?我尝试使用joinunion方法,但由于DataFrames上没有相同的列进行连接或合并,所以遇到了SQL异常。

编辑

data.withColumn("prediction", plDF.col("prediction"))

但是我遇到了一个分析异常 :(

1
我使用了https://dev59.com/aVwY5IYBdhLWcg3wNFdY中的解决方案,将我的预测与原始框架合并,这是否类似于您正在寻找的内容? - James Tobin
我知道它是如何工作的,但将其转换为RDD再加入创建数据框?似乎更像是一种变通方法。 - Camandros
1个回答

0

我还没有想出如何不使用RDD来解决它,但是无论如何,这是我用RDD解决它的方法。添加了其余的代码,以便任何人都可以理解完整的逻辑。欢迎提出任何建议。

package stuff

import java.util.logging.{Level, Logger}

import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}

/**
  * Created by camandros on 10-03-2017.
  */
class LinearSVMClassifier extends Classifier with Serializable{

  @transient lazy val log: Logger = Logger.getLogger(getClass.getName)

  private var model : SVMModel = _

  override def train(data : DataFrame): Unit = {
    val rdd = dataFrameToRDD(data)
    // Run training algorithm to build the model
    val numIter : Int = 100
    val step = Osint.properties(Osint.SVM_STEPSIZE).toDouble
    val c = Osint.properties(Osint.SVM_C).toDouble
    log.log(Level.INFO, "Initiating SVM training with parameters: C="+c+", step="+step)
    model = SVMWithSGD.train(rdd, numIterations = numIter, stepSize = step, regParam = c)
    log.log(Level.INFO, "Model training finished")

    // Clear the default threshold.
    model.clearThreshold()
  }

  override def classify(data : DataFrame): DataFrame = {
    log.log(Level.INFO, "Converting DataFrame to RDD")
    val rdd = dataFrameToRDD(data)
    log.log(Level.INFO, "Conversion finished; beginning classification")
    // Compute raw scores on the test set.
    val predictions = rdd.map(point => model.predict(point.features))
    log.log(Level.INFO, "Classification finished; Transforming RDD to DataFrame")

    val sqlContext : SQLContext = Osint.spark.sqlContext
    val tupleRDD = data.rdd.zip(predictions).map(t => Row.fromSeq(t._1.toSeq ++ Seq(t._2)))
    sqlContext.createDataFrame(tupleRDD, data.schema.add("predictions", "Double"))

    //TODO this should work it doesn't since this "withColumn" method seems to be applicable only to add
    // new columns using information from the same dataframe; therefore I am using the horrible rdd conversion
    //val sqlContext : SQLContext = Osint.spark.sqlContext
    //import sqlContext.implicits._
    //val plDF = predictions.toDF("predictions")
    //data.withColumn("prediction", plDF.col("predictions"))
  }

  private def dataFrameToRDD(dataFrame : DataFrame) : RDD[LabeledPoint] = {
    val rddMl = dataFrame.select("label", "features").rdd.map(r => (r.getInt(0).toDouble, r.getAs[org.apache.spark.ml.linalg.SparseVector](1)))
    rddMl.map(r => new LabeledPoint(r._1, Vectors.dense(r._2.toArray)))
  }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接