我的原始数据以表格形式呈现,包含来自不同变量的观察值。每个观察值都有变量名称、时间戳和该时间点的值。
变量 [字符串]、时间 [日期时间]、值 [浮点数]
数据以Parquet格式存储在HDFS中,并加载到Spark Dataframe(df)中。从这个dataframe中。
现在我想为每个变量计算默认的统计信息,如均值、标准差和其他信息。之后,一旦获取了均值,我想要过滤/计算那些接近均值的变量值的数量。
由于对我的其他问题的回答,我想出了以下代码:
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
w1 = Window().partitionBy("Variable")
w2 = Window.partitionBy("Variable").orderBy("Time")
def stddev_pop_w(col, w):
#Built-in stddev doesn't support windowing
return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2))
def isInRange(value, mean, stddev, radius):
try:
if (abs(value - mean) < radius * stddev):
return 1
else:
return 0
except AttributeError:
return -1
delta = col("Time").cast("long") - lag("Time", 1).over(w2).cast("long")
#f = udf(lambda (value, mean, stddev, radius): abs(value - mean) < radius * stddev, IntegerType())
#f2 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 2), IntegerType())
#f3 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 3), IntegerType())
df_ = df_all \
.withColumn("mean", mean("Value").over(w1)) \
.withColumn("std_deviation", stddev_pop_w(col("Value"), w1)) \
.withColumn("delta", delta) \
# .withColumn("stddev_2", f2("Value", "mean", "std_deviation")) \
# .withColumn("stddev_3", f3("Value", "mean", "std_deviation")) \
#df2.show(5, False)
问题: 最后两行注释不起作用。它会产生一个AttributeError,因为标准差和平均值的输入值为空。我猜这是因为我在引用那些只是在计算中生成并在此时没有值的列。但是有没有一种方法可以实现这个功能?
目前,我正在进行第二次运行,如下所示:
df = df_.select("*", \
abs(df_.Value - df_.mean).alias("max_deviation_mean"), \
when(abs(df_.Value - df_.mean) < 2 * df_.std_deviation, 1).otherwise(1).alias("std_dev_mean_2"), \
when(abs(df_.Value - df_.mean) < 3 * df_.std_deviation, 1).otherwise(1).alias("std_dev_mean_3"))