如何从DataFrame准备数据为LibSVM格式?

18

我想制作libsvm格式,所以我将数据框转换为所需的格式,但我不知道如何将其转换为libsvm格式。格式如图所示。我希望所需的libsvm类型为user item:rating。如果您知道如何处理当前情况:

val ratings = sc.textFile(new File("/user/ubuntu/kang/0829/rawRatings.csv").toString).map { line =>
     val fields = line.split(",")
      (fields(0).toInt,fields(1).toInt,fields(2).toDouble)
}
val user = ratings.map{ case (user,product,rate) => (user,(product.toInt,rate.toDouble))}
val usergroup = user.groupByKey 

val data =usergroup.map{ case(x,iter) => (x,iter.map(_._1).toArray,iter.map(_._2).toArray)}

val data_DF = data.toDF("user","item","rating")

数据框图

我正在使用Spark 2.0。

3个回答

19

您面临的问题可以分为以下几个部分:

  • 将您的评级(我相信是)转换为带标签的数据X
  • 将X以libsvm格式保存。

1. 将您的评级转换为带标签的数据X

让我们考虑以下原始评级:

val rawRatings: Seq[String] = Seq("0,1,1.0", "0,3,3.0", "1,1,1.0", "1,2,0.0", "1,3,3.0", "3,3,4.0", "10,3,4.5")
您可以将这些原始评分处理成一种坐标列表矩阵(COO)
Spark实现了一种由其条目的RDD支持的分布式矩阵:CoordinateMatrix,其中每个条目都是(i: Long, j: Long, value: Double)元组。 注意:仅当矩阵的两个维度都很大且矩阵非常稀疏时才应使用坐标列表矩阵(COO)。(通常是用户/项目评级的情况。)
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
import org.apache.spark.rdd.RDD

val data: RDD[MatrixEntry] = 
      sc.parallelize(rawRatings).map {
            line => {
                  val fields = line.split(",")
                  val i = fields(0).toLong
                  val j = fields(1).toLong
                  val value = fields(2).toDouble
                  MatrixEntry(i, j, value)
            }
      }

现在让我们将那个 RDD[MatrixEntry] 转换为一个 CoordinateMatrix 并提取索引行:

val df = new CoordinateMatrix(data) // Convert the RDD to a CoordinateMatrix
                .toIndexedRowMatrix().rows // Extract indexed rows
                .toDF("label", "features") // Convert rows

2. 将LabeledPoint数据以libsvm格式保存

自从Spark 2.0以来,您可以使用DataFrameWriter完成此操作。让我们创建一个带有一些虚假的LabeledPoint数据的小例子(您也可以使用之前创建的DataFrame):

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

val df = Seq(neg,pos).toDF("label","features")

很遗憾,我们仍然无法直接使用 DataFrameWriter,因为尽管大多数的管道组件支持向后兼容的加载,但是在Spark 2.0之前的一些现有DataFrames和管道中包含的向量或矩阵列可能需要迁移到新的spark.ml向量和矩阵类型。

转换DataFrame列从mllib.linalgml.linalg类型(反之亦然)的实用程序可以在org.apache.spark.mllib.util.MLUtils中找到。 在我们的情况下,我们需要执行以下操作(对于虚拟数据和step 1.中的DataFrame

import org.apache.spark.mllib.util.MLUtils
// convert DataFrame columns
val convertedVecDF = MLUtils.convertVectorColumnsToML(df)

现在让我们保存DataFrame:

convertedVecDF.write.format("libsvm").save("data/foo")

我们可以检查文件的内容:

$ cat data/foo/part*
0.0 1:1.0 3:3.0
1.0 1:1.0 2:0.0 3:3.0

编辑: 在当前版本的Spark(2.1.0)中,无需使用mllib包。您可以像下面这样简单地将LabeledPoint数据保存为libsvm格式:

import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.LabeledPoint
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

val df = Seq(neg,pos).toDF("label","features")
df.write.format("libsvm").save("data/foo")

我不确定我理解你的评论@big_mike_boiii。 - eliasah
是的,但有没有办法从仅包含行的DataFrame创建一个带标签点的DataFrame。Spark不允许在DataFrame上使用map。 - uh_big_mike_boi
@big_mike_boiii 你可以使用Dataset,或者如果卡住了,你总是可以退回到rdd,然后映射并转换为DF。 - eliasah
我认为你不能那么轻易地使用Dataset。而且我一直试图避免使用RDD。现在,我只能在每个字段上映射.asInstanceOf[Double]来使用Dataset,这有点像是一个hack。 - uh_big_mike_boi
让我们在聊天中继续这个讨论。链接:https://chat.stackoverflow.com/rooms/189392/discussion-between-eliasah-and-big-mike-boiii。 - eliasah
显示剩余7条评论

1
为了将现有的DataSet转换为类型化的DataSet,我建议采用以下步骤:使用以下案例类:
case class LibSvmEntry (
   value: Double,
   features: L.Vector)

您可以使用map函数将其转换为LibSVM条目,如下所示:df.map[LibSvmEntry](r: Row => /*在此处执行操作*/)

0

libsvm数据类型的特征是一个稀疏向量,您可以使用pyspark.ml.linalg.SparseVector来解决这个问题。

a = SparseVector(4, [1, 3], [3.0, 4.0])

def sparsevecfuc(len,index,score):
    """
     args: len int, index array, score array
    """
    return SparseVector(len,index,score)
trans_sparse = udf(sparsevecfuc,VectorUDT())

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接