如何在Spark DataFrame中访问VectorUDT列的元素?

27

我有一个名为df的数据框,其中有一个名为featuresVectorUDT列。如何获取该列的一个元素,例如第一个元素?

我尝试过以下方法:

from pyspark.sql.functions import udf
first_elem_udf = udf(lambda row: row.values[0])
df.select(first_elem_udf(df.features)).show()

但我遇到了一个net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict(for numpy.dtype)的错误。如果我使用first_elem_udf = first_elem_udf(lambda row: row.toArray()[0]),也会遇到同样的错误。

我还尝试过使用explode()函数,但是因为它需要数组或映射类型而出现了错误。

我认为这应该是一个常见的操作。

5个回答

28

将输出转换为 float

from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, udf

def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())

使用示例:

from pyspark.ml.linalg import Vectors

df = sc.parallelize([
    (1, Vectors.dense([1, 2, 3])),
    (2, Vectors.sparse(3, [1], [9]))
]).toDF(["id", "features"])

df.select(ith("features", lit(1))).show()

## +-----------------+
## |ith_(features, 1)|
## +-----------------+
## |              2.0|
## |              9.0|
## +-----------------+

解释:

输出值必须重新序列化为等效的Java对象。如果您想要访问values(注意SparseVectors),则应使用item方法:

v.values.item(0)
返回标准的Python标量。同样,如果您想要以密集结构访问所有值:
v.toArray().tolist()

我收到了 Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) 的错误信息。有什么线索吗? - haneulkim

3
如果您更喜欢使用spark.sql,可以使用以下自定义函数“to_array”将向量转换为数组,然后您可以将其作为数组进行操作。
 from pyspark.sql.types import ArrayType, DoubleType
 def to_array_(v):
        return v.toArray().tolist()
 from pyspark.sql import SQLContext
 sqlContext=SQLContext(spark.sparkContext, sparkSession=spark, jsqlContext=None) 
 sqlContext.udf.register("to_array",to_array_,  ArrayType(DoubleType()))

示例

    from pyspark.ml.linalg import Vectors
    
    df = sc.parallelize([
        (1, Vectors.dense([1, 2, 3])),
        (2, Vectors.sparse(3, [1], [9]))
    ]).toDF(["id", "features"])
    
    df.createOrReplaceTempView("tb")
    
    spark.sql("""select * , to_array(features)[1] Second from  tb   """).toPandas()

输出

    id  features    Second
0   1   [1.0, 2.0, 3.0] 2.0
1   2   (0.0, 9.0, 0.0) 9.0

2
我遇到了与无法使用explode()相同的问题。你可以使用pyspark.ml.feature库中的VectorSlice。像这样:
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row

slicer = VectorSlicer(inputCol="features", outputCol="features_one", indices=[0])

output = slicer.transform(df)

output.select("features", "features_one").show()

我最喜欢这个解决方案,但它仍然导致“features_one”列成为一个包含1个元素的列表。 - Dr. Andrew
我有同样的问题。有没有快速提取第一个元素的方法?另外,我们能否编写管道来从向量中“爆炸”多个元素? - Shi Chen

1

对于任何试图将 PySpark ML 模型训练后生成的概率列拆分为可用列的人来说,这里提供一种不使用 UDF 或 numpy 的方法。并且这仅适用于二元分类。这里的 lr_pred 是包含逻辑回归模型预测结果的数据框。

prob_df1=lr_pred.withColumn("probability",lr_pred["probability"].cast("String"))

prob_df =prob_df1.withColumn('probabilityre',split(regexp_replace("probability", "^\[|\]", ""), ",")[1].cast(DoubleType()))

0

请不要使用粗言秽语。 - Chris

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接