如何将包含 SparseVector 的 RDD 转换为包含 Vector 列的 DataFrame

13
我有一个包含(String,SparseVector)值元组的RDD,我想使用该RDD创建一个DataFrame。为了获得大多数ml算法库所需的模式(label:string,features:vector)DataFrame。 我知道这是可行的,因为HashingTF ml库在给定DataFrame的特征列时输出向量。
temp_df = sqlContext.createDataFrame(temp_rdd, StructType([
        StructField("label", DoubleType(), False),
        StructField("tokens", ArrayType(StringType()), False)
    ]))

#assumming there is an RDD (double,array(strings))

hashingTF = HashingTF(numFeatures=COMBINATIONS, inputCol="tokens", outputCol="features")

ndf = hashingTF.transform(temp_df)
ndf.printSchema()

#outputs 
#root
#|-- label: double (nullable = false)
#|-- tokens: array (nullable = false)
#|    |-- element: string (containsNull = true)
#|-- features: vector (nullable = true)

所以我的问题是,我是否可以将(String,SparseVector)的RDD转换为(String,vector)的DataFrame。我尝试使用通常的sqlContext.createDataFrame,但没有DataType符合我的需求。
df = sqlContext.createDataFrame(rdd,StructType([
        StructField("label" , StringType(),True),
        StructField("features" , ?Type(),True)
    ]))
3个回答

20

这里需要使用 VectorUDT

# In Spark 1.x
# from pyspark.mllib.linalg import SparseVector, VectorUDT
from pyspark.ml.linalg import SparseVector, VectorUDT

temp_rdd = sc.parallelize([
    (0.0, SparseVector(4, {1: 1.0, 3: 5.5})),
    (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))])

schema = StructType([
    StructField("label", DoubleType(), True),
    StructField("features", VectorUDT(), True)
])

temp_rdd.toDF(schema).printSchema()

## root
##  |-- label: double (nullable = true)
##  |-- features: vector (nullable = true)

为了完整起见,以下是Scala的等效代码:

import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DoubleType, StructType}
// In Spark 1x.
// import org.apache.spark.mllib.linalg.{Vectors, VectorUDT}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType

val schema = new StructType()
  .add("label", DoubleType)
   // In Spark 1.x
   //.add("features", new VectorUDT())
  .add("features",VectorType)

val temp_rdd: RDD[Row]  = sc.parallelize(Seq(
  Row(0.0, Vectors.sparse(4, Seq((1, 1.0), (3, 5.5)))),
  Row(1.0, Vectors.sparse(4, Seq((0, -1.0), (2, 0.5))))
))

spark.createDataFrame(temp_rdd, schema).printSchema

// root
// |-- label: double (nullable = true)
// |-- features: vector (nullable = true)

2
哇,我找了这么久终于找到了!差点因为开心而哭出来 :,) +1 - Alberto Bonsanto
1
这个可行!非常感谢!你能告诉我在文档中的哪里吗?我在linalg apache spark文档中找不到任何VectorUDT。 - Orangel Marquez
@OrangelMarquez 可能需要一个 pull request。 - Alberto Bonsanto
我不知道文档,但Spark源代码是一个有用的资源:https://github.com/apache/spark/blob/master/examples/src/main/python/ml/kmeans_example.py - zero323

4

虽然@zero323的回答https://dev59.com/t1wY5IYBdhLWcg3wVGeB#32745924很有道理,我希望它对我有效——但是底层数据集rdd仍然包含SparseVectors类型,因此我必须执行以下操作将其转换为DenseVector类型。如果有更短或更好的方法,请告诉我。

temp_rdd = sc.parallelize([
    (0.0, SparseVector(4, {1: 1.0, 3: 5.5})),
    (1.0, SparseVector(4, {0: -1.0, 2: 0.5}))])

schema = StructType([
    StructField("label", DoubleType(), True),
    StructField("features", VectorUDT(), True)
])

temp_rdd.toDF(schema).printSchema()
df_w_ftr = temp_rdd.toDF(schema)

print 'original convertion method: ',df_w_ftr.take(5)
print('\n')
temp_rdd_dense = temp_rdd.map(lambda x: Row(label=x[0],features=DenseVector(x[1].toArray())))
print type(temp_rdd_dense), type(temp_rdd)
print 'using map and toArray:', temp_rdd_dense.take(5)

temp_rdd_dense.toDF().show()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)

original convertion method:  [Row(label=0.0, features=SparseVector(4, {1: 1.0, 3: 5.5})), Row(label=1.0, features=SparseVector(4, {0: -1.0, 2: 0.5}))]


<class 'pyspark.rdd.PipelinedRDD'> <class 'pyspark.rdd.RDD'>
using map and toArray: [Row(features=DenseVector([0.0, 1.0, 0.0, 5.5]), label=0.0), Row(features=DenseVector([-1.0, 0.0, 0.5, 0.0]), label=1.0)]

+------------------+-----+
|          features|label|
+------------------+-----+
| [0.0,1.0,0.0,5.5]|  0.0|
|[-1.0,0.0,0.5,0.0]|  1.0|
+------------------+-----+

1

这是一个针对Spark 2.1的Scala示例

import org.apache.spark.ml.linalg.Vector

def featuresRDD2DataFrame(features: RDD[Vector]): DataFrame = {
    import sparkSession.implicits._
    val rdd: RDD[(Double, Vector)] = features.map(x => (0.0, x))
    val df = rdd.toDF("label","features").select("features")
    df
  }

特征rdd上编译器无法识别toDF()


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接