PySpark:一步计算平均值、标准差和围绕平均值的那些值

4

我的原始数据以表格形式呈现,包含来自不同变量的观察值。每个观察值都有变量名称、时间戳和该时间点的值。

变量 [字符串]、时间 [日期时间]、值 [浮点数]

数据以Parquet格式存储在HDFS中,并加载到Spark Dataframe(df)中。从这个dataframe中。

现在我想为每个变量计算默认的统计信息,如均值、标准差和其他信息。之后,一旦获取了均值,我想要过滤/计算那些接近均值的变量值的数量。

由于对我的其他问题的回答,我想出了以下代码:

from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *

w1 = Window().partitionBy("Variable")
w2 = Window.partitionBy("Variable").orderBy("Time")

def stddev_pop_w(col, w):
    #Built-in stddev doesn't support windowing
    return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2))

def isInRange(value, mean, stddev, radius):
    try:
        if (abs(value - mean) < radius * stddev):
            return 1
        else:
            return 0
    except AttributeError:
        return -1

delta = col("Time").cast("long") - lag("Time", 1).over(w2).cast("long")
#f = udf(lambda (value, mean, stddev, radius): abs(value - mean) < radius * stddev, IntegerType())
#f2 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 2), IntegerType())
#f3 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 3), IntegerType())

df_ = df_all \
    .withColumn("mean", mean("Value").over(w1)) \
    .withColumn("std_deviation", stddev_pop_w(col("Value"), w1)) \
    .withColumn("delta", delta) \
#    .withColumn("stddev_2", f2("Value", "mean", "std_deviation")) \
#    .withColumn("stddev_3", f3("Value", "mean", "std_deviation")) \

#df2.show(5, False)
问题: 最后两行注释不起作用。它会产生一个AttributeError,因为标准差和平均值的输入值为空。我猜这是因为我在引用那些只是在计算中生成并在此时没有值的列。但是有没有一种方法可以实现这个功能?

目前,我正在进行第二次运行,如下所示:

df = df_.select("*", \
    abs(df_.Value - df_.mean).alias("max_deviation_mean"), \
    when(abs(df_.Value - df_.mean) < 2 * df_.std_deviation, 1).otherwise(1).alias("std_dev_mean_2"), \
    when(abs(df_.Value - df_.mean) < 3 * df_.std_deviation, 1).otherwise(1).alias("std_dev_mean_3"))
2个回答

5
解决方案是使用DataFrame.aggregateByKey函数,该函数在将聚合传播到计算节点之前会对每个分区和节点的值进行聚合。然后,在计算节点上将它们组合成一个结果值。
伪代码如下。受该教程启发,但它使用StatCounter的两个实例,尽管我们同时汇总了两个不同的统计信息:
from pyspark.statcounter import StatCounter
# value[0] is the timestamp and value[1] is the float-value
# we are using two instances of StatCounter to sum-up two different statistics

def mergeValues(s1, v1, s2, v2):
    s1.merge(v1)
    s2.merge(v2)
    return

def combineStats(s1, s2):
    s1[0].mergeStats(s2[0])
    s1[1].mergeStats(s2[1])
    return
(df.aggregateByKey((StatCounter(), StatCounter()),
        (lambda s, values: mergeValues(s[0], values[0], s[1], values[1]),
        (lambda s1, s2: combineStats(s1, s2))
    .mapValues(lambda s: (  s[0].min(), s[0].max(), s[1].max(), s[1].min(), s[1].mean(), s[1].variance(), s[1].stddev,() s[1].count()))
    .collect())

2

这是行不通的,因为当你执行

from pyspark.sql.functions import *

您在使用内置的abs函数时,应该使用pyspark.sql.functions.abs函数,并将列作为输入参数,而不是本地Python值。

此外,您创建的UDF不能处理NULL条目。

  • Don't use import * unless you're aware of what exactly is imported. Instead alias

    from pyspark.sql.functions import abs as abs_
    

    or import module

    from pyspark.sql import functions as sqlf
    
    sqlf.col("x")
    
  • Always check input inside UDF or even better avoid UDFs unless necessary.


你是说只要我改变导入,它就能工作? - Matthias
我想说的是,这个问题以及缺乏NULL/None处理是明显的问题。可能还存在其他问题。 - zero323

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接