如何使用PySpark将向量拆分为列

70

背景:我有一个包含两列:单词和向量的DataFrame。其中"vector"列的数据类型为VectorUDT

示例:

word    |  vector
assert  | [435,323,324,212...]

我希望你能将其翻译成中文:

word   |  v1 | v2  | v3 | v4 | v5 | v6 ......
assert | 435 | 5435| 698| 356|....
如何使用PySpark将包含向量的列拆分为每个维度的多个列?谢谢。

请参考如何在Spark Dataframe中访问VectorUDT列的元素,以获得更好性能的解决方案。(我已经对两种方法进行了时间测试。如果我有足够的声望,我会将此标记为重复。) - hwrd
@hwrd,你能分享一下你所使用的基准测试代码吗?谢谢。 - 10465355
@user10465355 在下方添加了“解决方案”,因为它太大了无法在评论中显示。(该组织方式有些古怪,因为我从Jupyter笔记本中提取出来并替换了 %%timeit 单元格魔法命令。) - hwrd
4个回答

103

Spark >= 3.0.0

自Spark 3.0.0以后,可以不使用UDF来完成此操作。

from pyspark.ml.functions import vector_to_array

(df
    .withColumn("xs", vector_to_array("vector")))
    .select(["word"] + [col("xs")[i] for i in range(3)]))

## +-------+-----+-----+-----+
## |   word|xs[0]|xs[1]|xs[2]|
## +-------+-----+-----+-----+
## | assert|  1.0|  2.0|  3.0|
## |require|  0.0|  2.0|  0.0|
## +-------+-----+-----+-----+

Spark < 3.0.0

一种可能的方法是将数据转换为RDD并从中进行操作:

from pyspark.ml.linalg import Vectors

df = sc.parallelize([
    ("assert", Vectors.dense([1, 2, 3])),
    ("require", Vectors.sparse(3, {1: 2}))
]).toDF(["word", "vector"])

def extract(row):
    return (row.word, ) + tuple(row.vector.toArray().tolist())

df.rdd.map(extract).toDF(["word"])  # Vector values will be named _2, _3, ...

## +-------+---+---+---+
## |   word| _2| _3| _4|
## +-------+---+---+---+
## | assert|1.0|2.0|3.0|
## |require|0.0|2.0|0.0|
## +-------+---+---+---+

另一个解决方案是创建一个UDF:

from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType

def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    # Important: asNondeterministic requires Spark 2.3 or later
    # It can be safely removed i.e.
    # return udf(to_array_, ArrayType(DoubleType()))(col)
    # but at the cost of decreased performance
    return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)

(df
    .withColumn("xs", to_array(col("vector")))
    .select(["word"] + [col("xs")[i] for i in range(3)]))

## +-------+-----+-----+-----+
## |   word|xs[0]|xs[1]|xs[2]|
## +-------+-----+-----+-----+
## | assert|  1.0|  2.0|  3.0|
## |require|  0.0|  2.0|  0.0|
## +-------+-----+-----+-----+

有关Scala的等效方法,请参阅Spark Scala:如何将Dataframe [vector]转换为Dataframe [f1:Double,...,fn:Double)]


2
从性能方面来看,使用 .map/.toDF 函数要比 UDF 实现聪明得多,因为它们几乎总是比 UDF 实现更快。[除非您在使用 Spark 2.2+ 中的“矢量化UDF”定义。] - tmarthal
谢谢你的评论。RDD API 不会在某个时间点变为过时吗?所以我认为后者是推荐的,或者我错了吗? - user1972382
改进相关的 JIRA 工单链接:https://issues.apache.org/jira/browse/SPARK-19217 - pault
@zero323 - 对我来说,UDF的方式似乎已经奏效了。我该如何将结果保存为DF?当我在最后执行.show时,它以我想要的方式显示DF。只是似乎找不到命名/保存它的方法。 - Anonymous Person
我遇到了和@Chuck一样的问题,但是当创建df.toDF(["word", "vector"])时会发生这种情况。 - haneulkim
显示剩余3条评论

4

将在 PySpark ML 模型训练后生成的 rawPredictionprobability 列分割成 Pandas 列,您可以按照以下方式进行拆分:

your_pandas_df['probability'].apply(lambda x: pd.Series(x.toArray()))

1
我遇到了这个错误:TypeError: 'Column' 对象不可调用。 - Shubh

1

使用如何在Spark DataFrame中访问vector UDT列的元素中的第i个udf要快得多。

上面zero323提供的解决方案中给出的extract函数使用toList,它创建一个Python列表对象,用Python浮点对象填充它,通过遍历列表找到所需的元素,然后需要将其转换回java double;对每行重复。使用rdd比使用to_array udf慢得多,后者也调用toList,但两者都比让SparkSQL处理大部分工作的udf慢得多。

计时代码比较rdd extract和to_array udf与3955864中提出的i_th udf:

from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext, SparkSession
from pyspark.sql.functions import lit, udf, col
from pyspark.sql.types import ArrayType, DoubleType
import pyspark.sql.dataframe
from pyspark.sql.functions import pandas_udf, PandasUDFType

sc = SparkContext('local[4]', 'FlatTestTime')

spark = SparkSession(sc)
spark.conf.set("spark.sql.execution.arrow.enabled", True)

from pyspark.ml.linalg import Vectors

# copy the two rows in the test dataframe a bunch of times,
# make this small enough for testing, or go for "big data" and be prepared to wait
REPS = 20000

df = sc.parallelize([
    ("assert", Vectors.dense([1, 2, 3]), 1, Vectors.dense([4.1, 5.1])),
    ("require", Vectors.sparse(3, {1: 2}), 2, Vectors.dense([6.2, 7.2])),
] * REPS).toDF(["word", "vector", "more", "vorpal"])

def extract(row):
    return (row.word, ) + tuple(row.vector.toArray().tolist(),) + (row.more,) + tuple(row.vorpal.toArray().tolist(),)

def test_extract():
    return df.rdd.map(extract).toDF(['word', 'vector__0', 'vector__1', 'vector__2', 'more', 'vorpal__0', 'vorpal__1'])

def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    return udf(to_array_, ArrayType(DoubleType()))(col)

def test_to_array():
    df_to_array = df.withColumn("xs", to_array(col("vector"))) \
        .select(["word"] + [col("xs")[i] for i in range(3)] + ["more", "vorpal"]) \
        .withColumn("xx", to_array(col("vorpal"))) \
        .select(["word"] + ["xs[{}]".format(i) for i in range(3)] + ["more"] + [col("xx")[i] for i in range(2)])
    return df_to_array

# pack up to_array into a tidy function
def flatten(df, vector, vlen):
    fieldNames = df.schema.fieldNames()
    if vector in fieldNames:
        names = []
        for fieldname in fieldNames:
            if fieldname == vector:
                names.extend([col(vector)[i] for i in range(vlen)])
            else:
                names.append(col(fieldname))
        return df.withColumn(vector, to_array(col(vector)))\
                 .select(names)
    else:
        return df

def test_flatten():
    dflat = flatten(df, "vector", 3)
    dflat2 = flatten(dflat, "vorpal", 2)
    return dflat2

def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())

select = ["word"]
select.extend([ith("vector", lit(i)) for i in range(3)])
select.append("more")
select.extend([ith("vorpal", lit(i)) for i in range(2)])

# %% timeit ...
def test_ith():
    return df.select(select)

if __name__ == '__main__':
    import timeit

    # make sure these work as intended
    test_ith().show(4)
    test_flatten().show(4)
    test_to_array().show(4)
    test_extract().show(4)

    print("i_th\t\t",
          timeit.timeit("test_ith()",
                       setup="from __main__ import test_ith",
                       number=7)
         )
    print("flatten\t\t",
          timeit.timeit("test_flatten()",
                       setup="from __main__ import test_flatten",
                       number=7)
         )
    print("to_array\t",
          timeit.timeit("test_to_array()",
                       setup="from __main__ import test_to_array",
                       number=7)
         )
    print("extract\t\t",
          timeit.timeit("test_extract()",
                       setup="from __main__ import test_extract",
                       number=7)
         )

结果:

i_th         0.05964796099999958
flatten      0.4842299350000001
to_array     0.42978780299999997
extract      2.9254476840000017

这是一些看起来很棒的代码,但它具体做什么呢?你能为使用的不同方法添加一些解释吗? - Chuck
将此处接受的解决方案中的两个 Spark < 3.0.0 函数与 https://dev59.com/VlkS5IYBdhLWcg3ws4ck 中提出的 ith 函数一起编写代码,以表明后者的解决方案更为优秀。--来源于用户https://stackoverflow.com/users/2254228/chuck - hwrd
1
你正在比较DAG构造而不是实际的转换。 - A.Eddine

0
def splitVecotr(df, new_features=['f1','f2']):
schema = df.schema
cols = df.columns

for col in new_features: # new_features should be the same length as vector column length
    schema = schema.add(col,DoubleType(),True)

return spark.createDataFrame(df.rdd.map(lambda row: [row[i] for i in cols]+row.features.tolist()), schema)

该函数将特征向量列转换为单独的列。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接