scala> spark.version
res8: String = 2.2.0
我正在处理一个包含列locationID
的spark Dataframe。我已经创建了一个MLlib管道来构建线性回归模型,并且当我为单个locationID
提供数据时,它可以正常工作。我现在想为每个'locationID'创建多个模型(生产中可能有几千个locationID)。我想保存每个模型的系数。
我不确定如何在Scala中完成这项任务。
我的管道定义如下:
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.LinearRegressionModel
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql
// Load the regression input data
val mydata = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("./inputdata.csv")
// Crate month one hot encoding
val monthIndexer = new StringIndexer()
.setInputCol("month")
.setOutputCol("monthIndex").fit(mydata)
val monthEncoder = new OneHotEncoder()
.setInputCol(monthIndexer.getOutputCol)
.setOutputCol("monthVec")
val assembler = new VectorAssembler()
.setInputCols(Array("monthVec","tran_adr"))
.setOutputCol("features")
val lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
val pipeline = new Pipeline()
.setStages(Array(monthIndexer, monthEncoder, assembler, lr))
// Fit using the model pipeline
val myPipelineModel = pipeline.fit(mydata)
我可以这样获取模型的详细信息:
val modelExtract = myPipelineModel.stages(3).asInstanceOf[LinearRegressionModel]
println(s"Coefficients: ${modelExtract.coefficients} Intercept: ${modelExtract.intercept}")
// Summarize the model over the training set and print out some metrics
val trainingSummary = modelExtract.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")
现在我想按
mydata
中的locationID
列分组,并在数据的每个分区上运行流水线。
我尝试使用groupby,但我只能进行聚合。
val grouped = mydata.groupBy("locationID")
我也尝试过将唯一的locationID
作为列表提取并循环遍历:
val locationList = mydata.select(mydata("prop_code")).distinct
locationList.foreach { printLn }
我知道Spark不适合创建许多较小的模型,最适合在大型数据集上创建一个模型,但作为概念验证,我被要求这样做。
在Spark中,如何正确处理这样的任务?