如何从Spark ML随机森林获取与类相对应的概率

12
我一直在使用org.apache.spark.ml.Pipeline进行机器学习任务。对于预测标签,知道实际概率而非仅有预测标签尤为重要,但是我却难以获得实际概率。我正在使用随机森林进行二元分类任务,类别标签为“Yes”和“No”。我想输出"Yes"的概率。概率以DenseVector形式存储在管道输出中,例如[0.69, 0.31],但我不知道哪个对应于"Yes"(是0.69还是0.31?)。我猜测从labelIndexer中有一种方式可以检索到它。
以下是我的训练模型代码任务。
val sc = new SparkContext(new SparkConf().setAppName(" ML").setMaster("local"))
val data = .... // load data from file
val df = sqlContext.createDataFrame(data).toDF("label", "features")
val labelIndexer = new StringIndexer()
                      .setInputCol("label")
                      .setOutputCol("indexedLabel")
                      .fit(df)

val featureIndexer = new VectorIndexer()
                        .setInputCol("features")
                        .setOutputCol("indexedFeatures")
                        .setMaxCategories(2)
                        .fit(df)


// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels)

val Array(trainingData, testData) = df.randomSplit(Array(0.7, 0.3))


// Train a RandomForest model.
val rf = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setNumTrees(10)
  .setFeatureSubsetStrategy("auto")
  .setImpurity("gini")
  .setMaxDepth(4)
  .setMaxBins(32)

// Create pipeline
val pipeline = new Pipeline()
    .setStages(Array(labelIndexer, featureIndexer, rf,labelConverter))

// Train model
val model = pipeline.fit(trainingData)

// Save model
sc.parallelize(Seq(model), 1).saveAsObjectFile("/my/path/pipeline")

接下来我将加载管道并对新数据进行预测,以下是代码片段:

// Ignoring loading data part

// Create DF
val testdf = sqlContext.createDataFrame(testData).toDF("features", "line")
// Load pipeline
val model = sc.objectFile[org.apache.spark.ml.PipelineModel]("/my/path/pipeline").first

// My Question comes here : How to extract the probability that corresponding to class label "1"
// This is my attempt, I would like to output probability for label "Yes" and predicted label . The probabilities are stored in a denseVector, but I don't know which one is corresponding to "Yes". Something like this:
val predictions = model.transform(testdf).select("probability").map(e=>   e.asInstanceOf[DenseVector])

参考文献有关于RF的概率和标签: http://spark.apache.org/docs/latest/ml-classification-regression.html#random-forests

你是什么意思说 “我想输出标签“1”的概率和预测标签。 这些概率存储在DenseVector中作为管道输出,但我不知道哪一个对应于“1”。”? - eliasah
嗨,我已经更新了描述。基本上,我想输出与标签“是”相对应的概率。 - Qing
@Qing 你是怎么解决这个问题的? - Meethu Mathew
@Qing:你找到答案了吗?我们有一个包含2个概率的向量。哪个概率对应于标签的哪个类别?哪个概率对应于“是”,哪个概率对应于“否”...????? - Anneso
2个回答

0

从标签索引器中检索它是正确的方法。

有关更多信息,请参阅代码中的注释。

此示例适用于Scala 2.11.8和Spark 2.2.1。

import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.SparkConf
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{Column, SparkSession}

object Example {

  case class Record(features: org.apache.spark.ml.linalg.Vector)

  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession
      .builder
      .appName("Example")
      .config(new SparkConf().setMaster("local[2]"))
      .getOrCreate

    val sc = spark.sparkContext

    import spark.implicits._

    val data = sc.parallelize(
      Array(
        (Vectors.dense(0.9, 0.6), "n"),
        (Vectors.dense(0.1, 0.1), "y"),
        (Vectors.dense(0.2, 0.15), "y"),
        (Vectors.dense(0.8, 0.9), "n"),
        (Vectors.dense(0.3, 0.4), "y"),
        (Vectors.dense(0.5, 0.5), "n"),
        (Vectors.dense(0.6, 0.7), "n"),
        (Vectors.dense(0.3, 0.3), "y"),
        (Vectors.dense(0.3, 0.3), "y"),
        (Vectors.dense(-0.5, -0.1), "dunno"),
        (Vectors.dense(-0.9, -0.6), "dunno")
      )).toDF("features", "label")

    // NOTE: you're fitting StringIndexer to all your data.
    // The StringIndexer orders the labels by label frequency.
    // In this example there are 5 "y" labels, 4 "n" labels
    // and 2 "dunno" labels, so the probability columns will be
    // listed in the following order: "y", "n", "dunno".
    // You can play with label frequencies to convince yourself
    // that it sorts labels by frequency in provided data.
    val labelIndexer = new StringIndexer()
      .setInputCol("label")
      .setOutputCol("label_indexed")
      .fit(data)

    val indexToLabel = new IndexToString()
      .setInputCol("prediction")
      .setOutputCol("predicted_label")
      .setLabels(labelIndexer.labels)

    // Here I use logistic regression, but the exact algorithm doesn't
    // matter in this case.
    val lr = new LogisticRegression()
      .setFeaturesCol("features")
      .setLabelCol("label_indexed")
      .setPredictionCol("prediction")

    val pipeline = new Pipeline().setStages(Array(
      labelIndexer,
      lr,
      indexToLabel
    ))

    val model = pipeline.fit(data)

    // Prepare test set
    val toPredictDf = sc.parallelize(Array(
      Record(Vectors.dense(0.1, 0.5)),
      Record(Vectors.dense(0.8, 0.8)),
      Record(Vectors.dense(-0.2, -0.5))
    )).toDF("features")

    // Make predictions
    val results = model.transform(toPredictDf)

    // The column containing probabilities has to be converted from Vector to Array
    val vecToArray = udf( (xs: org.apache.spark.ml.linalg.Vector) => xs.toArray )
    val dfArr = results.withColumn("probabilityArr" , vecToArray($"probability") )

    // labelIndexer.labels contains the list of your labels.
    // It is zipped with index to match the label name with
    // related probability found in probabilities array.
    // In other words:
    // label labelIndexer.labels.apply(idx)
    // matches:
    // col("probabilityArr").getItem(idx)
    // See also: https://dev59.com/Yarka4cB1Zd3GeqPiLz5#49917851
    val probColumns = labelIndexer.labels.zipWithIndex.map {
      case (alias, idx) => (alias, col("probabilityArr").getItem(idx).as(alias))
    }

    // 'probColumns' is of type Array[(String, Column)] so now 
    // concatenate these Column objects to DataFrame containing predictions
    // See also: https://dev59.com/Q6Dia4cB1Zd3GeqPMf2T#43494322
    val columnsAdded = probColumns.foldLeft(dfArr) { case (d, (colName, colContents)) =>
      if (d.columns.contains(colName)) {
        d
      } else {
        d.withColumn(colName, colContents)
      }
    }

    columnsAdded.show()
  }
}

运行此代码后,将生成以下数据框:

+-----------+---------------+--------------------+--------------------+--------------------+
|   features|predicted_label|                   y|                   n|               dunno|
+-----------+---------------+--------------------+--------------------+--------------------+
|  [0.1,0.5]|              y|  0.9999999999994298|5.702468131669394...|9.56953780171369E-19|
|  [0.8,0.8]|              n|5.850695258713685...|                 1.0|4.13416875406573E-81|
|[-0.2,-0.5]|          dunno|1.207908506571593...|8.157018363627128...|  0.9998792091493428|
+-----------+---------------+--------------------+--------------------+--------------------+

yndunno 是我们刚刚添加到 Spark ML 管道普通输出中的列。


0

你的意思是想从DenseVector中提取正标签的概率吗?如果是这样,你可以创建一个UDF函数来解决这个问题。 在二元分类的DenseVector中,第一列表示“0”的概率,第二列表示“1”的概率。

val prediction = pipelineModel.transform(result)
val pre = prediction.select(getOne($"probability")).withColumnRenamed("UDF(probability)","probability")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接