如何在pyspark中将密集向量的RDD转换为DataFrame?

11

我有一个像这样的DenseVectorRDD

>>> frequencyDenseVectors.collect()
[DenseVector([1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0]), DenseVector([1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0])]

我想将这个转换成一个Dataframe。我尝试了以下方式

>>> spark.createDataFrame(frequencyDenseVectors, ['rawfeatures']).collect()

它会给出这样的错误

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 520, in createDataFrame
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 360, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 340, in _inferSchema
    schema = _infer_schema(first)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 991, in _infer_schema
    fields = [StructField(k, _infer_type(v), True) for k, v in items]
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 968, in _infer_type
    raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'numpy.ndarray'>

旧解决方案

frequencyVectors.map(lambda vector: DenseVector(vector.toArray()))

编辑1 - 代码可复制

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import split

from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.mllib.linalg import SparseVector, DenseVector

sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
sc.setLogLevel('ERROR')

sentenceData = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (0, "I wish Java could use case classes"),
    (1, "Logistic regression models are neat")
], ["label", "sentence"])
sentenceData = sentenceData.withColumn("sentence", split("sentence", "\s+"))
sentenceData.show()

vectorizer = CountVectorizer(inputCol="sentence", outputCol="rawfeatures").fit(sentenceData)
countVectors = vectorizer.transform(sentenceData).select("label", "rawfeatures")

idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(countVectors)
tfidf = idfModel.transform(countVectors).select("label", "features")
frequencyDenseVectors = tfidf.rdd.map(lambda vector: [vector[0],DenseVector(vector[1].toArray())])
frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])
2个回答

14

您不能直接转换RDD [Vector]。它应该被映射到可以被解释为structs的对象的RDD上,例如RDD [Tuple [Vector]]

frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])
否则,Spark 将尝试转换对象 __dict__ 并将未支持的 NumPy 数组用作字段。
from pyspark.ml.linalg import DenseVector  
from pyspark.sql.types import _infer_schema

v = DenseVector([1, 2, 3])
_infer_schema(v)
TypeError                                 Traceback (most recent call last)
... 
TypeError: not supported type: <class 'numpy.ndarray'>

对比。

_infer_schema((v, ))
StructType(List(StructField(_1,VectorUDT,true)))

注意:

  • 在Spark 2.0中,当使用基于pyspark.ml API的DataFrame时,请使用正确的本地类型pyspark.ml.linalg,当使用基于pyspark.mllib API的RDD时,请使用pyspark.mllib.linalg

    这两个命名空间不再兼容,并需要显式转换(例如如何从org.apache.spark.mllib.linalg.VectorUDT转换为ml.linalg.VectorUDT)。

  • 编辑中提供的代码与原始问题中的代码不相等。您应该知道tuplelist没有相同的语义。如果将向量映射到一对,请使用tuple并直接转换为DataFrame

tfidf.rdd.map(
    lambda row: (row[0], DenseVector(row[1].toArray()))
).toDF()

使用tuple(元组类型)也可以用于嵌套结构,但我怀疑这不是你想要的:

(tfidf.rdd
    .map(lambda row: (row[0], DenseVector(row[1].toArray())))
    .map(lambda x: (x, ))
    .toDF())

如果list不在顶层row中,则会被解释为一个ArrayType,使用UDF进行转换更加简洁(Spark Python:标准缩放器错误"Do not support ... SparseVector")。


  • 1
    我认为这里的问题在于createDataframe不接受denseVactor作为参数。请尝试将denseVector转换为相应的集合(即Array或List)。在scala和java中,toArray()方法可用于将denseVector转换为数组或列表,然后尝试创建dataFrame。

    网页内容由stack overflow 提供, 点击上面的
    可以查看英文原文,
    原文链接