使用Pipeline基于分区的DataFrame创建多个Spark MLlib模型

4

scala> spark.version res8: String = 2.2.0

我正在处理一个包含列locationID的spark Dataframe。我已经创建了一个MLlib管道来构建线性回归模型,并且当我为单个locationID提供数据时,它可以正常工作。我现在想为每个'locationID'创建多个模型(生产中可能有几千个locationID)。我想保存每个模型的系数。

我不确定如何在Scala中完成这项任务。

我的管道定义如下:

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql


// Load the regression input data
val mydata = spark.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("./inputdata.csv")

// Crate month one hot encoding
val monthIndexer = new StringIndexer()
  .setInputCol("month")
  .setOutputCol("monthIndex").fit(mydata)
val monthEncoder = new OneHotEncoder()
  .setInputCol(monthIndexer.getOutputCol)
  .setOutputCol("monthVec")
val assembler =  new VectorAssembler()
  .setInputCols(Array("monthVec","tran_adr"))
  .setOutputCol("features")
val lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)
val pipeline = new Pipeline()
  .setStages(Array(monthIndexer, monthEncoder, assembler, lr))


// Fit using the model pipeline
val myPipelineModel = pipeline.fit(mydata)

我可以这样获取模型的详细信息:
val modelExtract = myPipelineModel.stages(3).asInstanceOf[LinearRegressionModel]

println(s"Coefficients: ${modelExtract.coefficients} Intercept: ${modelExtract.intercept}")
// Summarize the model over the training set and print out some metrics
val trainingSummary = modelExtract.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")

现在我想按mydata中的locationID列分组,并在数据的每个分区上运行流水线。

我尝试使用groupby,但我只能进行聚合。

val grouped = mydata.groupBy("locationID")

我也尝试过将唯一的locationID作为列表提取并循环遍历:

val locationList = mydata.select(mydata("prop_code")).distinct

locationList.foreach { printLn }

我知道Spark不适合创建许多较小的模型,最适合在大型数据集上创建一个模型,但作为概念验证,我被要求这样做。

在Spark中,如何正确处理这样的任务?

2个回答

4

在Spark中做类似这样的事情的正确方法是什么?

我敢说根本没有好的方法。有许多先进的工具可以处理内核数据处理和许多任务调度库可以用于编排独立学习任务。Spark在这方面根本没有提供任何东西。

它的调度能力很一般,ML / MLlib 工具也是如此,当每个任务都是独立的时,扩展性和容错性也没有用处。

您可以使用Spark进行通用目的的调度(如果您不介意使用Python,则可以使用 sklearn keyed models 实现此想法),但仅此而已。


谢谢。我已经用Python编写了这个程序,现在想为了基准测试的目的在Scala/Spark中实现相同的功能。目前我只是想得到一个能够产生与我的Python/sklearn代码相同结果的东西。 - Rob
重新表达我的问题。我不仅想知道“正确的方法是什么”,还想知道“哪种方法可行?” - Rob

1
我遇到了同样的问题。我的数据是按“description_pretty”进行分区的,这就是我处理它的方式。我将数据框拆分为其分区,将其馈送到管道中,选择相关列,然后将其合并在一起。
    val pipe = new Pipeline().setStages(Array(encoder, assembler, 
         multivariate_linear_model))

    val descriptions_pretty = training_df.select("description_pretty").
         distinct.
         as[String].
         rdd.
         collect

    val model_predictions_df = descriptions_pretty.par.
         map(x => pipe.fit(training_df.filter($"description_pretty" === x)).
              transform(prediction_df.filter($"description_pretty" === x)).
              select($"description", $"description_pretty", 
              $"standard_event_date".cast("String"), 
              $"prediction".as("daily_peak_bps"))).
         reduce( _ union _)

你可以在 .transform 之前停下来,而是获取系数。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接