给定一个Spark数据框,我想基于该列的非缺失和非未知值计算出该列的均值。然后我想使用这个均值来替换该列的缺失和未知值。
例如,假设我正在处理以下内容:
- 名为df的数据框,每个记录表示一个人,所有列都是整数或数字 - 名为age(每个记录的年龄)的列 - 名为missing_age的列(如果该个体没有年龄,则等于1;否则为0) - 名为unknown_age的列(如果该个体的年龄未知,则等于1;否则为0)
那么我可以按照下面的方式计算这个均值。
我尽量避免使用 SQL / Windows 函数。我的挑战是使用非 SQL 方法,将这个 mean 替换为未知/缺失值。
我尝试过 when()、where()、replace()、withColumn、UDFs 等等,但无论我做什么,要么出现错误,要么结果与我期望的不一样。以下是我尝试过但不起作用的其中一个示例。
我已经搜索了整个网络,但没有找到类似的归因类型问题,所以非常感谢任何帮助。这可能是我错过的非常简单的东西。
另外一件事——我正在尝试将此代码应用于Spark Dataframe中所有列,这些列的列名中没有unknown_或missing_。我可以在Python“for循环”中包装Spark相关代码,并遍历所有适用的列来完成此操作吗?
更新:
还发现如何循环遍历列...下面是一个示例。
例如,假设我正在处理以下内容:
- 名为df的数据框,每个记录表示一个人,所有列都是整数或数字 - 名为age(每个记录的年龄)的列 - 名为missing_age的列(如果该个体没有年龄,则等于1;否则为0) - 名为unknown_age的列(如果该个体的年龄未知,则等于1;否则为0)
那么我可以按照下面的方式计算这个均值。
calc_mean = df.where((col("unknown_age") == 0) & (col("missing_age") == 0))
.agg(avg(col("age")))
或通过SQL和Windows函数,
mean_compute = hiveContext.sql("select avg(age) over() as mean from df
where missing_age = 0 and unknown_age = 0")
我尽量避免使用 SQL / Windows 函数。我的挑战是使用非 SQL 方法,将这个 mean 替换为未知/缺失值。
我尝试过 when()、where()、replace()、withColumn、UDFs 等等,但无论我做什么,要么出现错误,要么结果与我期望的不一样。以下是我尝试过但不起作用的其中一个示例。
imputed = df.when((col("unknown_age") == 1) | (col("missing_age") == 1),
calc_mean).otherwise("age")
我已经搜索了整个网络,但没有找到类似的归因类型问题,所以非常感谢任何帮助。这可能是我错过的非常简单的东西。
另外一件事——我正在尝试将此代码应用于Spark Dataframe中所有列,这些列的列名中没有unknown_或missing_。我可以在Python“for循环”中包装Spark相关代码,并遍历所有适用的列来完成此操作吗?
更新:
还发现如何循环遍历列...下面是一个示例。
for x in df.columns:
if 'unknown_' not in x and 'missing_' not in x:
avg_compute = df.where(df['missing_' + x] != 1).agg(avg(x)).first()[0]
df = df.withColumn(x + 'mean_miss_imp', when((df['missing_' + x] == 1),
avg_compute).otherwise(df[x]))