如何在Apache Spark中计算百分位数

26

我有一个整数类型的RDD(即RDD[Int]),我想要做的是计算以下十个百分位数:[0th, 10th, 20th, ..., 90th, 100th]。最有效的方法是什么?

10个回答

22
您可以:
  1. 通过rdd.sortBy()对数据集进行排序
  2. 通过rdd.count()计算数据集的大小
  3. 使用zipWithIndex方法来方便地检索百分位数
  4. 通过rdd.lookup()检索所需的百分位数,例如要获取第10个百分位数,可以使用rdd.lookup(0.1 * size)

要计算中位数和第99个百分位数:

getPercentiles(rdd, new double[]{0.5, 0.99}, size, numPartitions);

在Java 8中:

public static double[] getPercentiles(JavaRDD<Double> rdd, double[] percentiles, long rddSize, int numPartitions) {
    double[] values = new double[percentiles.length];

    JavaRDD<Double> sorted = rdd.sortBy((Double d) -> d, true, numPartitions);
    JavaPairRDD<Long, Double> indexed = sorted.zipWithIndex().mapToPair((Tuple2<Double, Long> t) -> t.swap());

    for (int i = 0; i < percentiles.length; i++) {
        double percentile = percentiles[i];
        long id = (long) (rddSize * percentile);
        values[i] = indexed.lookup(id).get(0);
    }

    return values;
}

请注意,这需要对数据集进行排序,时间复杂度为O(n.log(n)),在大型数据集上可能会很昂贵。
另一个建议只计算直方图的答案无法正确计算百分位数:以下是一个反例:一个由100个数字组成的数据集,其中99个数字为0,一个数字为1。你最终会得到所有99个0在第一个箱子中,而1在最后一个箱子中,中间有8个空箱子。

如果N百分比很小,例如10%,20%,那么我将执行以下操作: - Laeeq

7

如何看待t-digest

https://github.com/tdunning/t-digest


一个新的数据结构,用于精确的在线积累基于排名的统计数据,例如分位数和修剪均值。t-digest算法也非常适合并行处理,因此在map-reduce和并行流应用程序中非常有用。
t-digest构建算法使用一维k-means聚类的变体来生成一个与Q-digest相关的数据结构。这个t-digest数据结构可以用于估计分位数或计算其他排名统计量。t-digest相对于Q-digest的优点在于,t-digest可以处理浮点值,而Q-digest仅限于整数。通过小的改变,t-digest可以处理任何具有类似平均值的有序集合中的任何值。尽管t-digest在磁盘存储时更紧凑,但t-digest产生的分位数估计的准确性比Q-digest产生的分位数估计的准确性高出数个数量级。
总之,t-digest的特别有趣的特性是:
- 比Q-digest具有更小的摘要 - 可以处理双精度浮点数以及整数。 - 提供每百万分之一的极端分位数精度和通常低于1000 ppm的中间分位数精度 - 快速 - 非常简单 - 具有超过90%的测试覆盖率的参考实现 - 可以与map-reduce非常容易地一起使用,因为可以合并摘要

使用Spark的参考Java实现应该非常容易。


3
实际上,Erik Erlandson 在这里开发了一个 Spark 版本的实现:https://github.com/isarn/isarn-sketches-spark。它工作得很好。我唯一发现的问题是无法将 TDigest 对象保存为 Parquet 格式。但只要你只是把大量数据丢进去,并请求一些百分位结果,它就是你要找的东西 :) - John Humphreys

4
我发现了这个Gist:

https://gist.github.com/felixcheung/92ae74bc349ea83a9e29

其中包含以下函数:
  /**
   * compute percentile from an unsorted Spark RDD
   * @param data: input data set of Long integers
   * @param tile: percentile to compute (eg. 85 percentile)
   * @return value of input data at the specified percentile
   */
  def computePercentile(data: RDD[Long], tile: Double): Double = {
    // NIST method; data to be sorted in ascending order
    val r = data.sortBy(x => x)
    val c = r.count()
    if (c == 1) r.first()
    else {
      val n = (tile / 100d) * (c + 1d)
      val k = math.floor(n).toLong
      val d = n - k
      if (k <= 0) r.first()
      else {
        val index = r.zipWithIndex().map(_.swap)
        val last = c
        if (k >= c) {
          index.lookup(last - 1).head
        } else {
          index.lookup(k - 1).head + d * (index.lookup(k).head - index.lookup(k - 1).head)
        }
      }
    }
  }

3
如果您不介意将RDD转换为DataFrame,并使用Hive UDAF,您可以使用percentile。假设您已经将HiveContext hiveContext加载到作用域中:
hiveContext.sql("SELECT percentile(x, array(0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9)) FROM yourDataFrame")
我在this answer中了解到这个Hive UDAF。

2

这是我在Spark上用Python实现的,用于计算包含感兴趣值的RDD的百分位数。

def percentile_threshold(ardd, percentile):
    assert percentile > 0 and percentile <= 100, "percentile should be larger then 0 and smaller or equal to 100"

    return ardd.sortBy(lambda x: x).zipWithIndex().map(lambda x: (x[1], x[0])) \
            .lookup(np.ceil(ardd.count() / 100 * percentile - 1))[0]

# Now test it out
import numpy as np
randlist = range(1,10001)
np.random.shuffle(randlist)
ardd = sc.parallelize(randlist)

print percentile_threshold(ardd,0.001)
print percentile_threshold(ardd,1)
print percentile_threshold(ardd,60.11)
print percentile_threshold(ardd,99)
print percentile_threshold(ardd,99.999)
print percentile_threshold(ardd,100)

# output:
# 1
# 100
# 6011
# 9900
# 10000
# 10000

另外,我定义了以下函数以获取第10到100百分位。

def get_percentiles(rdd, stepsize=10):
    percentiles = []
    rddcount100 = rdd.count() / 100 
    sortedrdd = ardd.sortBy(lambda x: x).zipWithIndex().map(lambda x: (x[1], x[0]))


    for p in range(0, 101, stepsize):
        if p == 0:
            pass
            # I am not aware of a formal definition of 0 percentile, 
            # you can put a place holder like this if you want
            # percentiles.append(sortedrdd.lookup(0)[0] - 1) 
        elif p == 100:
            percentiles.append(sortedrdd.lookup(np.ceil(rddcount100 * 100 - 1))[0])
        else:
            pv = sortedrdd.lookup(np.ceil(rddcount100 * p) - 1)[0]
            percentiles.append(pv)

    return percentiles

randlist = range(1,10001)
np.random.shuffle(randlist)
ardd = sc.parallelize(randlist)
get_percentiles(ardd, 10)

# [1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000]

get_percentiles函数中,sortedrdd的定义应该将ardd替换为rdd。同时添加import numpy as np语句。在numpy 1.11.3版本中好像无法正常工作。 - Jorge Lavín

1
根据这里给出的答案 Spark/Scala中的中位数UDAF,我使用UDAF在Spark窗口(Spark 2.1)上计算百分位数:

首先是一个抽象的通用UDAF,用于其他聚合操作。

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer


abstract class GenericUDAF extends UserDefinedAggregateFunction {

  def inputSchema: StructType =
    StructType(StructField("value", DoubleType) :: Nil)

  def bufferSchema: StructType = StructType(
    StructField("window_list", ArrayType(DoubleType, false)) :: Nil
  )

  def deterministic: Boolean = true

  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = new ArrayBuffer[Double]()
  }

  def update(buffer: MutableAggregationBuffer,input: org.apache.spark.sql.Row): Unit = {
    var bufferVal = buffer.getAs[mutable.WrappedArray[Double]](0).toBuffer
    bufferVal+=input.getAs[Double](0)
    buffer(0) = bufferVal
  }

  def merge(buffer1: MutableAggregationBuffer, buffer2: org.apache.spark.sql.Row): Unit = {
    buffer1(0) = buffer1.getAs[ArrayBuffer[Double]](0) ++ buffer2.getAs[ArrayBuffer[Double]](0)
  }

  def dataType: DataType
  def evaluate(buffer: Row): Any

}

然后为十分位数定制的百分位数UDAF:

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer


class DecilesUDAF extends GenericUDAF {

  override def dataType: DataType = ArrayType(DoubleType, false)

  override def evaluate(buffer: Row): Any = {
    val sortedWindow = buffer.getAs[mutable.WrappedArray[Double]](0).sorted.toBuffer
    val windowSize = sortedWindow.size
    if (windowSize == 0) return null
    if (windowSize == 1) return (0 to 10).map(_ => sortedWindow.head).toArray

    (0 to 10).map(i => sortedWindow(Math.min(windowSize-1, i*windowSize/10))).toArray

  }
}

然后,UDAF被实例化并在分区和排序窗口上调用:
val deciles = new DecilesUDAF()
df.withColumn("mt_deciles", deciles(col("mt")).over(myWindow))

您可以使用 getItem 将结果数组拆分为多个列:
def splitToColumns(size: Int, splitCol:String)(df: DataFrame) = {
  (0 to size).foldLeft(df) {
    case (df_arg, i) => df_arg.withColumn("mt_decile_"+i, col(splitCol).getItem(i))
  }
}

df.transform(splitToColumns(10, "mt_deciles" ))

UDA函数比本地Spark函数慢,但只要每个分组的数据包或每个窗口相对较小并且适合单个执行器,那么它就可以正常工作。主要优点是使用Spark并行性。 只需轻微努力,此代码即可扩展到n分位数。

我使用以下函数测试了该代码:

def testDecilesUDAF = {
    val window = W.partitionBy("user")
    val deciles = new DecilesUDAF()

    val schema = StructType(StructField("mt", DoubleType) :: StructField("user", StringType) :: Nil)

    val rows1 = (1 to 20).map(i => Row(i.toDouble, "a"))
    val rows2 = (21 to 40).map(i => Row(i.toDouble, "b"))

    val df = spark.createDataFrame(spark.sparkContext.makeRDD[Row](rows1++rows2), schema)

    df.withColumn("deciles", deciles(col("mt")).over(window))
      .transform(splitToColumns(10, "deciles" ))
      .drop("deciles")
      .show(100, truncate=false)
  }

输出的前3行:

+----+----+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+
|mt  |user|mt_decile_0|mt_decile_1|mt_decile_2|mt_decile_3|mt_decile_4|mt_decile_5|mt_decile_6|mt_decile_7|mt_decile_8|mt_decile_9|mt_decile_10|
+----+----+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+
|21.0|b   |21.0       |23.0       |25.0       |27.0       |29.0       |31.0       |33.0       |35.0       |37.0       |39.0       |40.0        |
|22.0|b   |21.0       |23.0       |25.0       |27.0       |29.0       |31.0       |33.0       |35.0       |37.0       |39.0       |40.0        |
|23.0|b   |21.0       |23.0       |25.0       |27.0       |29.0       |31.0       |33.0       |35.0       |37.0       |39.0       |40.0        |

1
将您的RDD转换为Double类型的RDD,然后使用.histogram(10)操作。请参见DoubleRDD ScalaDoc

5
histogram(bucketCount)函数并不计算百分位数,它会在RDD的最小值和最大值之间均匀地分配bucketCount个桶,从而“计算数据的直方图”。 - Dmitry

1

如果N百分比很小,如10%、20%,那么我会执行以下操作:

  1. 计算数据集的大小,rdd.count(),如果您已经知道它,请跳过并将其作为参数。

  2. 不是对整个数据集进行排序,而是从每个分区中找出前N个。为此,我需要找到N = rdd.count的N%,然后对分区进行排序并从每个分区中取出前N个。现在,您有一个要排序的小得多的数据集。

3.rdd.sortBy

4.zipWithIndex

5.filter (index < topN)


0

另一种替代方法是在RDD of double上使用top和last。例如,val percentile_99th_value=scores.top((count/100).toInt).last

这种方法更适合计算单个百分位数。


0

这是我的简单方法:

val percentiles = Array(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1)
val accuracy = 1000000
df.stat.approxQuantile("score", percentiles, 1.0/accuracy)

输出:

scala> df.stat.approxQuantile("score", percentiles, 1.0/accuracy)
res88: Array[Double] = Array(0.011044141836464405, 0.02022990956902504, 0.0317261666059494, 0.04638145491480827, 0.06498630344867706, 0.0892181545495987, 0.12161539494991302, 0.16825592517852783, 0.24740923941135406, 0.9188197255134583)

精度:精度参数(默认值:10000)是一个正数,它控制近似精度和内存成本之间的权衡。精度值越高,精度越好,1.0/精度是逼近误差的相对值。


使用GroupBy后会是什么样子?它似乎不理解stat - Xavier John

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接