在SPARK Dataframe中对列进行缩放(归一化)- Pyspark

19

我正在尝试使用Python对SPARK DataFrame中的一列进行归一化处理。

我的数据集:

--------------------------
userID|Name|Revenue|No.of.Days|
--------------------------
1      A     12560    45
2      B     2312890  90
.      .       .       .
.      .       .       .
.      .       .       .
--------------------------

在这个数据集中,除了userID和Name以外,我需要对Revenue和No.of Days进行归一化。

输出应该如下所示


userID|Name|Revenue|No.of.Days|
--------------------------
1      A     0.5     0.5
2      B     0.9       1
.      .       1     0.4
.      .     0.6       .
.      .       .       .
--------------------------

用于计算或归一化每列值的公式为:

val = (ei-min)/(max-min)
ei = column value at i th position
min = min value in that column
max = max value in that column

我该如何使用PySpark轻松实现此操作?


请添加一个简单的输入和预期输出示例。 - Yaron
@JackDaniel,你找到解决方案了吗?我也遇到了类似的问题。 - thetna
1个回答

35
希望以下代码能够满足您的需求。

代码:

df = spark.createDataFrame([ (1, 'A',12560,45),
                             (1, 'B',42560,90),
                             (1, 'C',31285,120),
                             (1, 'D',10345,150)
                           ], ["userID", "Name","Revenue","No_of_Days"])

print("Before Scaling :")
df.show(5)

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# UDF for converting column type from vector to double type
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())

# Iterating over columns to be scaled
for i in ["Revenue","No_of_Days"]:
    # VectorAssembler Transformation - Converting column to vector type
    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

    # MinMaxScaler Transformation
    scaler = MinMaxScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

    # Pipeline of VectorAssembler and MinMaxScaler
    pipeline = Pipeline(stages=[assembler, scaler])

    # Fitting pipeline on dataframe
    df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")

print("After Scaling :")
df.show(5)

输出:

Output


2
对于DataFrame的所有列进行归一化,甚至可能有数千列?迭代要缩放的列仍然很慢。 - rosefun
3
非常棒的答案。但是,对于任何在缩放后使用KMeans()的人来说,出于某种奇怪的原因,如果我不将数据类型留为vector,它会抛出错误。使用StandardScaler() + VectorAssembler()+ KMeans()需要矢量类型。即使使用VectorAssembler将其转换为向量;如果我使用float-> vector而不是vector-> vector,则始终会提示我特征向量中有na/null值。 - kevin_theinfinityfund

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接