Pyspark数据帧插补 -- 根据指定条件用列平均值替换未知和缺失的值。

7
给定一个Spark数据框,我想基于该列的非缺失和非未知值计算出该列的均值。然后我想使用这个均值来替换该列的缺失和未知值。
例如,假设我正在处理以下内容:
- 名为df的数据框,每个记录表示一个人,所有列都是整数或数字 - 名为age(每个记录的年龄)的列 - 名为missing_age的列(如果该个体没有年龄,则等于1;否则为0) - 名为unknown_age的列(如果该个体的年龄未知,则等于1;否则为0)
那么我可以按照下面的方式计算这个均值。
calc_mean = df.where((col("unknown_age") == 0) & (col("missing_age") == 0))
.agg(avg(col("age")))

或通过SQL和Windows函数,

mean_compute = hiveContext.sql("select avg(age) over() as mean from df 
where missing_age = 0 and unknown_age = 0")

我尽量避免使用 SQL / Windows 函数。我的挑战是使用非 SQL 方法,将这个 mean 替换为未知/缺失值。
我尝试过 when()、where()、replace()、withColumn、UDFs 等等,但无论我做什么,要么出现错误,要么结果与我期望的不一样。以下是我尝试过但不起作用的其中一个示例。
imputed = df.when((col("unknown_age") == 1) | (col("missing_age") == 1),
calc_mean).otherwise("age")

我已经搜索了整个网络,但没有找到类似的归因类型问题,所以非常感谢任何帮助。这可能是我错过的非常简单的东西。
另外一件事——我正在尝试将此代码应用于Spark Dataframe中所有列,这些列的列名中没有unknown_或missing_。我可以在Python“for循环”中包装Spark相关代码,并遍历所有适用的列来完成此操作吗?
更新:
还发现如何循环遍历列...下面是一个示例。
for x in df.columns:
    if 'unknown_' not in x and 'missing_' not in x:
        avg_compute = df.where(df['missing_' + x] != 1).agg(avg(x)).first()[0]
        df = df.withColumn(x + 'mean_miss_imp', when((df['missing_' + x] == 1), 
        avg_compute).otherwise(df[x]))
1个回答

11
如果未知或缺失的年龄具有某个值:
from pyspark.sql.functions import col, avg, when

df = sc.parallelize([
    (10, 0, 0), (20, 0, 0), (-1, 1, 0), (-1, 0, 1)
]).toDF(["age", "missing_age", "unknown_age"])

avg_age = df.where(
    (col("unknown_age") != 1) & (col("missing_age") != 1)
).agg(avg("age")).first()[0]

df.withColumn("age_imp", when(
    (col("unknown_age") == 1) | (col("missing_age") == 1), avg_age
).otherwise(col("age")))
如果年龄为未知或缺失,则可以简化为以下内容:

如果年龄为 NULL,则可以简化为:

df = sc.parallelize([
    (10, 0, 0), (20, 0, 0), (None, 1, 0), (None, 0, 1)
]).toDF(["age", "missing_age", "unknown_age"])

df.na.fill(df.na.drop().agg(avg("age")).first()[0], ["age"])

非常感谢您!您的帮助让我这一周过得很愉快!我还学会了如何应用到所有列并更新了帖子。 - midnightfalcon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接