背景:我有一个包含两列:单词和向量的DataFrame
。其中"vector"列的数据类型为VectorUDT
。
示例:
word | vector
assert | [435,323,324,212...]
我希望你能将其翻译成中文:
word | v1 | v2 | v3 | v4 | v5 | v6 ......
assert | 435 | 5435| 698| 356|....
如何使用PySpark将包含向量的列拆分为每个维度的多个列?谢谢。背景:我有一个包含两列:单词和向量的DataFrame
。其中"vector"列的数据类型为VectorUDT
。
示例:
word | vector
assert | [435,323,324,212...]
我希望你能将其翻译成中文:
word | v1 | v2 | v3 | v4 | v5 | v6 ......
assert | 435 | 5435| 698| 356|....
如何使用PySpark将包含向量的列拆分为每个维度的多个列?谢谢。Spark >= 3.0.0
自Spark 3.0.0以后,可以不使用UDF来完成此操作。
from pyspark.ml.functions import vector_to_array
(df
.withColumn("xs", vector_to_array("vector")))
.select(["word"] + [col("xs")[i] for i in range(3)]))
## +-------+-----+-----+-----+
## | word|xs[0]|xs[1]|xs[2]|
## +-------+-----+-----+-----+
## | assert| 1.0| 2.0| 3.0|
## |require| 0.0| 2.0| 0.0|
## +-------+-----+-----+-----+
Spark < 3.0.0
一种可能的方法是将数据转换为RDD并从中进行操作:
from pyspark.ml.linalg import Vectors
df = sc.parallelize([
("assert", Vectors.dense([1, 2, 3])),
("require", Vectors.sparse(3, {1: 2}))
]).toDF(["word", "vector"])
def extract(row):
return (row.word, ) + tuple(row.vector.toArray().tolist())
df.rdd.map(extract).toDF(["word"]) # Vector values will be named _2, _3, ...
## +-------+---+---+---+
## | word| _2| _3| _4|
## +-------+---+---+---+
## | assert|1.0|2.0|3.0|
## |require|0.0|2.0|0.0|
## +-------+---+---+---+
另一个解决方案是创建一个UDF:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType
def to_array(col):
def to_array_(v):
return v.toArray().tolist()
# Important: asNondeterministic requires Spark 2.3 or later
# It can be safely removed i.e.
# return udf(to_array_, ArrayType(DoubleType()))(col)
# but at the cost of decreased performance
return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)
(df
.withColumn("xs", to_array(col("vector")))
.select(["word"] + [col("xs")[i] for i in range(3)]))
## +-------+-----+-----+-----+
## | word|xs[0]|xs[1]|xs[2]|
## +-------+-----+-----+-----+
## | assert| 1.0| 2.0| 3.0|
## |require| 0.0| 2.0| 0.0|
## +-------+-----+-----+-----+
有关Scala的等效方法,请参阅Spark Scala:如何将Dataframe [vector]转换为Dataframe [f1:Double,...,fn:Double)]。
.map/.toDF
函数要比 UDF 实现聪明得多,因为它们几乎总是比 UDF 实现更快。[除非您在使用 Spark 2.2+ 中的“矢量化UDF”定义。] - tmarthal.toDF(["word", "vector"])
时会发生这种情况。 - haneulkim将在 PySpark ML 模型训练后生成的 rawPrediction
或 probability
列分割成 Pandas 列,您可以按照以下方式进行拆分:
your_pandas_df['probability'].apply(lambda x: pd.Series(x.toArray()))
使用如何在Spark DataFrame中访问vector UDT列的元素中的第i个udf要快得多。
上面zero323提供的解决方案中给出的extract函数使用toList,它创建一个Python列表对象,用Python浮点对象填充它,通过遍历列表找到所需的元素,然后需要将其转换回java double;对每行重复。使用rdd比使用to_array udf慢得多,后者也调用toList,但两者都比让SparkSQL处理大部分工作的udf慢得多。
计时代码比较rdd extract和to_array udf与3955864中提出的i_th udf:
from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext, SparkSession
from pyspark.sql.functions import lit, udf, col
from pyspark.sql.types import ArrayType, DoubleType
import pyspark.sql.dataframe
from pyspark.sql.functions import pandas_udf, PandasUDFType
sc = SparkContext('local[4]', 'FlatTestTime')
spark = SparkSession(sc)
spark.conf.set("spark.sql.execution.arrow.enabled", True)
from pyspark.ml.linalg import Vectors
# copy the two rows in the test dataframe a bunch of times,
# make this small enough for testing, or go for "big data" and be prepared to wait
REPS = 20000
df = sc.parallelize([
("assert", Vectors.dense([1, 2, 3]), 1, Vectors.dense([4.1, 5.1])),
("require", Vectors.sparse(3, {1: 2}), 2, Vectors.dense([6.2, 7.2])),
] * REPS).toDF(["word", "vector", "more", "vorpal"])
def extract(row):
return (row.word, ) + tuple(row.vector.toArray().tolist(),) + (row.more,) + tuple(row.vorpal.toArray().tolist(),)
def test_extract():
return df.rdd.map(extract).toDF(['word', 'vector__0', 'vector__1', 'vector__2', 'more', 'vorpal__0', 'vorpal__1'])
def to_array(col):
def to_array_(v):
return v.toArray().tolist()
return udf(to_array_, ArrayType(DoubleType()))(col)
def test_to_array():
df_to_array = df.withColumn("xs", to_array(col("vector"))) \
.select(["word"] + [col("xs")[i] for i in range(3)] + ["more", "vorpal"]) \
.withColumn("xx", to_array(col("vorpal"))) \
.select(["word"] + ["xs[{}]".format(i) for i in range(3)] + ["more"] + [col("xx")[i] for i in range(2)])
return df_to_array
# pack up to_array into a tidy function
def flatten(df, vector, vlen):
fieldNames = df.schema.fieldNames()
if vector in fieldNames:
names = []
for fieldname in fieldNames:
if fieldname == vector:
names.extend([col(vector)[i] for i in range(vlen)])
else:
names.append(col(fieldname))
return df.withColumn(vector, to_array(col(vector)))\
.select(names)
else:
return df
def test_flatten():
dflat = flatten(df, "vector", 3)
dflat2 = flatten(dflat, "vorpal", 2)
return dflat2
def ith_(v, i):
try:
return float(v[i])
except ValueError:
return None
ith = udf(ith_, DoubleType())
select = ["word"]
select.extend([ith("vector", lit(i)) for i in range(3)])
select.append("more")
select.extend([ith("vorpal", lit(i)) for i in range(2)])
# %% timeit ...
def test_ith():
return df.select(select)
if __name__ == '__main__':
import timeit
# make sure these work as intended
test_ith().show(4)
test_flatten().show(4)
test_to_array().show(4)
test_extract().show(4)
print("i_th\t\t",
timeit.timeit("test_ith()",
setup="from __main__ import test_ith",
number=7)
)
print("flatten\t\t",
timeit.timeit("test_flatten()",
setup="from __main__ import test_flatten",
number=7)
)
print("to_array\t",
timeit.timeit("test_to_array()",
setup="from __main__ import test_to_array",
number=7)
)
print("extract\t\t",
timeit.timeit("test_extract()",
setup="from __main__ import test_extract",
number=7)
)
结果:
i_th 0.05964796099999958
flatten 0.4842299350000001
to_array 0.42978780299999997
extract 2.9254476840000017
ith
函数一起编写代码,以表明后者的解决方案更为优秀。--来源于用户https://stackoverflow.com/users/2254228/chuck - hwrddef splitVecotr(df, new_features=['f1','f2']):
schema = df.schema
cols = df.columns
for col in new_features: # new_features should be the same length as vector column length
schema = schema.add(col,DoubleType(),True)
return spark.createDataFrame(df.rdd.map(lambda row: [row[i] for i in cols]+row.features.tolist()), schema)