Scala Spark - 在Spark DataFrame中将向量列拆分为单独的列

9

我有一个Spark DataFrame,其中有一列包含向量值。这些向量值都是n维的,即长度相同。我还有一个列名列表Array("f1", "f2", "f3", ..., "fn"),每个元素对应于向量中的一个元素。

some_columns... | Features
      ...       | [0,1,0,..., 0]

to

some_columns... | f1 | f2 | f3 | ... | fn

      ...       | 0  | 1  | 0  | ... | 0

什么是实现这个的最佳方式?我想到了一种方法,就是使用createDataFrame(Row(Features), featureNameList)创建一个新的 DataFrame,然后与旧 DataFrame 进行连接,但这需要 spark context 来使用 createDataFrame。 我只想转换现有的数据框。我也知道用.withColumn("fi", value),但如果 n 很大怎么办?
我是 Scala 和 Spark 的新手,找不到任何好的示例。我认为这可能是一个常见的任务。我的特殊情况是我使用了 CountVectorizer,希望为了更好的可读性而单独恢复每个列,而不仅仅是向量结果。
1个回答

20

一种方法是将vector列转换为一个array<double>,然后使用getItem来提取单个元素。

import org.apache.spark.sql.functions._
import org.apache.spark.ml._

val df = Seq( (1 , linalg.Vectors.dense(1,0,1,1,0) ) ).toDF("id", "features")
//df: org.apache.spark.sql.DataFrame = [id: int, features: vector]

df.show
//+---+---------------------+
//|id |features             |
//+---+---------------------+
//|1  |[1.0,0.0,1.0,1.0,0.0]|
//+---+---------------------+

// A UDF to convert VectorUDT to ArrayType
val vecToArray = udf( (xs: linalg.Vector) => xs.toArray )

// Add a ArrayType Column   
val dfArr = df.withColumn("featuresArr" , vecToArray($"features") )

// Array of element names that need to be fetched
// ArrayIndexOutOfBounds is not checked.
// sizeof `elements` should be equal to the number of entries in column `features`
val elements = Array("f1", "f2", "f3", "f4", "f5")

// Create a SQL-like expression using the array 
val sqlExpr = elements.zipWithIndex.map{ case (alias, idx) => col("featuresArr").getItem(idx).as(alias) }

// Extract Elements from dfArr    
dfArr.select(sqlExpr : _*).show
//+---+---+---+---+---+
//| f1| f2| f3| f4| f5|
//+---+---+---+---+---+
//|1.0|0.0|1.0|1.0|0.0|
//+---+---+---+---+---+

谢谢你的回答!这真的很有帮助。你能否添加最后一步,即将原始df获取新的单独列,即生成具有id(实际上不仅仅是id,应该是所有其他现有列)和f1,f2...列的df。这样,df就可以在原地修改。这对你来说可能很明显,但我想学习正确的方法,因为我还不太熟悉Scala。 - Logan Yang
2
你可以使用 dfArr.select( (col("id") +: sqlExpr) :_*).show(false)。这将把列 id 前置到 sqlExpr 数组中,然后将其传递给 select 函数。同时请注意,没有进行原地更改。dfArr.select( (col("id") +: sqlExpr) :_*) 将返回一个新的数据框。因为数据框是不可变的,所以 df 仍将保留原始内容。 - philantrovert
谢谢指出!我也意识到不应该进行原地更改。 - Logan Yang
一个自我提醒:要将所有先前的列相加,可以使用 dfArr.select( (col("*") +: sqlExpr) :_*) - Logan Yang

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接