您面临的问题可以分为以下几个部分:
- 将您的评级(我相信是)转换为带标签的数据X。
- 将X以libsvm格式保存。
1. 将您的评级转换为带标签的数据X
让我们考虑以下原始评级:
val rawRatings: Seq[String] = Seq("0,1,1.0", "0,3,3.0", "1,1,1.0", "1,2,0.0", "1,3,3.0", "3,3,4.0", "10,3,4.5")
您可以将这些原始评分处理成一种
坐标列表矩阵(COO)。
Spark实现了一种由其条目的RDD支持的分布式矩阵:
CoordinateMatrix
,其中每个条目都是(i: Long, j: Long, value: Double)元组。
注意:仅当矩阵的两个维度都很大且矩阵非常稀疏时才应使用坐标列表矩阵(COO)。(通常是用户/项目评级的情况。)
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.rdd.RDD
val data: RDD[MatrixEntry] =
sc.parallelize(rawRatings).map {
line => {
val fields = line.split(",")
val i = fields(0).toLong
val j = fields(1).toLong
val value = fields(2).toDouble
MatrixEntry(i, j, value)
}
}
现在让我们将那个 RDD[MatrixEntry]
转换为一个 CoordinateMatrix
并提取索引行:
val df = new CoordinateMatrix(data) // Convert the RDD to a CoordinateMatrix
.toIndexedRowMatrix().rows // Extract indexed rows
.toDF("label", "features") // Convert rows
2. 将LabeledPoint数据以libsvm格式保存
自从Spark 2.0以来,您可以使用DataFrameWriter
完成此操作。让我们创建一个带有一些虚假的LabeledPoint数据的小例子(您也可以使用之前创建的DataFrame
):
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
val df = Seq(neg,pos).toDF("label","features")
很遗憾,我们仍然无法直接使用 DataFrameWriter
,因为尽管大多数的管道组件支持向后兼容的加载,但是在Spark 2.0之前的一些现有DataFrames和管道中包含的向量或矩阵列可能需要迁移到新的spark.ml向量和矩阵类型。
转换DataFrame列从mllib.linalg
到ml.linalg
类型(反之亦然)的实用程序可以在org.apache.spark.mllib.util.MLUtils
中找到。 在我们的情况下,我们需要执行以下操作(对于虚拟数据和step 1.
中的DataFrame
)
import org.apache.spark.mllib.util.MLUtils
// convert DataFrame columns
val convertedVecDF = MLUtils.convertVectorColumnsToML(df)
现在让我们保存DataFrame:
convertedVecDF.write.format("libsvm").save("data/foo")
我们可以检查文件的内容:
$ cat data/foo/part*
0.0 1:1.0 3:3.0
1.0 1:1.0 2:0.0 3:3.0
编辑:
在当前版本的Spark(2.1.0)中,无需使用mllib
包。您可以像下面这样简单地将LabeledPoint
数据保存为libsvm格式:
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
val df = Seq(neg,pos).toDF("label","features")
df.write.format("libsvm").save("data/foo")