pyspark 1.6中替代pandas quantile和cut的方法有哪些?

5

我是pyspark的新手。我有以下类似于pandas的代码。

bindt = df[df[var].notnull()][var].quantile([0,.1,.2,.3,.4,.5,.6,.7,.8,.9,1]).unique()

df['{0}_quartile'.format(var)] = pd.cut(df[var], bindt, labels=False, include_lowest=True )

我发现在pyspark 2.x中有'approxQuantile'方法,但在pyspark 1.6.0中没有。

我的样例输入:

df.show()

+-----------+----------+---------------+--------------+------------------------+
|  id       | col_1    |col_2          |col_3         |col_4                   |
+-----------+----------+---------------+--------------+------------------------+
|1.10919E+16|3988487.35|     -236751.43|    -362208.07|                0.660000|
|1.10919E+16|3988487.35|     -236751.43|    -362208.07|                0.900000|
|1.10919E+16|3988487.35|     -236751.43|    -362208.07|                0.660000|
|1.10919E+16|  36718.55|           null|          null|                0.860000|
|1.10919E+16|  36718.55|           null|          null|                0.780000|
|1.10919E+16|  36718.55|           null|          null|                0.660000|
|1.10919E+16|  36718.55|           null|          null|                0.900000|
|1.10919E+16|  36718.55|           null|          null|                0.660000|

df.collect()

[Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.080000')), 
Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.780000')), 
Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.780000')), 
Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.860000')), 
Row(id=u'1.11312E+16', col_1=Decimal('367364.44'), col_2=Decimal('-401715.23'), col_3=Decimal('-1649917.53'), col_4=Decimal('0.330000'))]

我需要对所有输入列循环上述逻辑。
for var in df.columns:
    bindt = df[df[var].notnull()][var].quantile([0,.1,.2,.3,.4,.5,.6,.7,.8,.9,1]).unique()    
    df['{0}_quartile'.format(var)] = pd.cut(df[var], bindt, labels=False, include_lowest=True )

有人能提供一下如何将上述代码重写为pyspark 1.6 dataframe的建议吗?
谢谢。

1
请将示例输入变得可执行。像+-|这样的无意义符号使得复制粘贴数据样本变得非常困难。此外,您代码示例中的var不在数据框中/不是一个定义的变量。 - JE_Muc
@Scotty1- 我已经编辑了我的问题。请检查一下,如果你找到任何解决方案,请告诉我。谢谢 - Valli69
重写是指将代码从pandas转换为pyspark,还是循环遍历pandas数据框,并将其插入到pyspark数据框中? - xilpex
@Xilpex - 是的,我想把这段代码从pandas转换到pyspark。 - Valli69
1个回答

8
如果您正在使用pyspark 2.x,可以使用ml库中的QuantileDiscretizer,该库在内部使用approxQuantile()Bucketizer
但是,由于您使用的是pyspark 1.6.x,因此需要执行以下操作:

1. 查找列的分位数值

有两种方法可以查找分位数值:

  1. 通过计算percent_rank()来计算列的百分位数,并提取具有接近所需分位数的百分位值的列值

  2. 按照this answer中解释的方法进行操作,该方法说明如何使用pyspark < 2.0.0执行分位数近似

以下是我的分位数实现示例:

from pyspark.sql import functions as F
from pyspark.sql import Window

def compute_quantiles(df, col, quantiles):
  quantiles = sorted(quantiles)

  # 1. compute percentile
  df = df.withColumn("percentile", F.percent_rank().over(Window.orderBy(col)))

  # 2. categorize quantile based on the desired quantile and compute errors
  df = df.withColumn("percentile_cat1", F.lit(-1.0))
  df = df.withColumn("percentile_err1", F.lit(-1.0))
  df = df.withColumn("percentile_cat2", F.lit(-1.0))
  df = df.withColumn("percentile_err2", F.lit(-1.0))

  # check percentile with the lower boundaries
  for idx in range(0, len(quantiles)-1):
    q = quantiles[idx]
    df = df.withColumn("percentile_cat1", F\
                       .when( (F.col("percentile_cat1") == -1.0) & 
                             (F.col("percentile") <= q), q)\
                       .otherwise(F.col("percentile_cat1")))
    df = df.withColumn("percentile_err1", F\
                       .when( (F.col("percentile_err1") == -1.0) & 
                             (F.col("percentile") <= q), 
                             F.pow(F.col("percentile") - q, 2))\
                       .otherwise(F.col("percentile_err1")))

  # assign the remaining -1 values in the error to the largest squared error of 1
  df = df.withColumn("percentile_err1", F\
                     .when(F.col("percentile_err1") == -1.0, 1)\
                     .otherwise(F.col("percentile_err1")))

  # check percentile with the upper boundaries
  for idx in range(1, len(quantiles)):
    q = quantiles[idx]
    df = df.withColumn("percentile_cat2", F\
                       .when((F.col("percentile_cat2") == -1.0) & 
                             (F.col("percentile") <= q), q)\
                       .otherwise(F.col("percentile_cat2")))
    df = df.withColumn("percentile_err2",F\
                       .when((F.col("percentile_err2") == -1.0) & 
                             (F.col("percentile") <= q), 
                             F.pow(F.col("percentile") - q, 2))\
                       .otherwise(F.col("percentile_err2")))

  # assign the remaining -1 values in the error to the largest squared error of 1
  df = df.withColumn("percentile_err2", F\
                     .when(F.col("percentile_err2") == -1.0, 1)\
                     .otherwise(F.col("percentile_err2")))

  # select the nearest quantile to the percentile
  df = df.withColumn("percentile_cat", F\
                     .when(F.col("percentile_err1") < F.col("percentile_err2"), 
                           F.col("percentile_cat1"))\
                     .otherwise(F.col("percentile_cat2")))
  df = df.withColumn("percentile_err", F\
                     .when(F.col("percentile_err1") < F.col("percentile_err2"), 
                           F.col("percentile_err1"))\
                     .otherwise(F.col("percentile_err2")))

  # 3. approximate quantile values by choosing the value with the lowest error at each percentile category
  df = df.withColumn("approx_quantile", F\
                     .first(col).over(Window\
                                      .partitionBy("percentile_cat")\
                                      .orderBy(F.asc("percentile_err"))))

  return df

def extract_quantiles(df):
  df_quantiles = df.select("percentile_cat", "approx_quantile").distinct()
  rows = df_quantiles.collect()
  quantile_values = [ row.approx_quantile for row in rows ]

  return quantile_values

我想要实现的是计算每一行在列中的百分位,并将其归类到最近的分位数。将百分位归类到最近的分位数可以通过选择具有最低差异(平方误差)的最低分位数类别来完成。 1. 计算百分位 首先,我使用pyspark中的Window函数 percent_rank() 计算列的百分位。您可以将Window视为数据的分区规范。由于percent_rank()是一个Window函数,因此需要传入Window。 2. 将百分位归类到分位数边界并计算误差 最接近一个百分位数的四分位类别可以是低于、等于或高于它。因此,我需要计算两次误差:首先将百分位数与下限四分位边界进行比较,然后再将其与上限四分位边界进行比较。请注意,≤ 运算符用于检查百分位数是否小于或等于边界。在知道了一个百分位数的直接上限和下限四分位边界之后,我们可以通过选择具有最低误差的下限或上限分类来将百分位数分配到最接近的四分位类别中。 3. 近似四分位值 一旦我们知道每个百分位数的最近分位类别,我们就可以近似计算分位值:即在每个分位类别中具有最低误差的值。这些近似的分位值可以使用first()函数在每个类别分区中使用Window进行计算。接下来,为了提取分位值,我们只需从数据框中选择唯一的percentileCategory-approxQuantileValue对。
在测试了我的数据(约10000行)并使用desired_quantiles = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]后,我发现我的示例实现与approxQuantile结果非常接近。随着我减少提供给approxQuantile的误差,两个结果值变得更加接近。
使用extract_quantiles(compute_quantile(df, col, quantiles)):

enter image description here

使用approxQuantile:

enter image description here

2. 使用 Bucketizer

在找到分位数值之后,您可以使用 pyspark 的 Bucketizer 根据分位数对值进行分桶。Bucketizer 在 pyspark 1.6.x [1][2] 和 2.x [3][4] 中都可用。

以下是如何执行分桶的示例:

from pyspark.ml.feature import Bucketizer

bucketedData = df
desired_quantiles = [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0] # must be sorted

for col in df.columns:
  quantile_values = extract_quantiles(compute_quantiles(df, col, desired_quantiles))
  splits = [ boundary_values ] # replace this with quantile_values

  bucketizer = Bucketizer()\
    .setInputCol(col)\
    .setOutputCol("{}_quantile".format(col))\
    .setSplits(splits)
  bucketedData = bucketizer.transform(bucketedData)

您可以将value_boundaries替换为第一步中找到的分位数值或任何您想要的桶分割范围。当使用bucketizer时,整个列的值范围必须在拆分内涵盖。否则,指定拆分之外的值将被视为错误。如果您不确定数据的值边界,则必须明确提供无限值,例如-float(“inf”)float(“inf”)以涵盖所有浮点值。

嗨@dekauliya - 感谢您的回复。我有几个问题。1)desired_quantiles是否意味着[0,.1,.2,.3,.4,.5,.6,.7,.8,.9,1]?2)我没有理解第二步,我的意思是计算percentile_cat1,percentile_cat2和percentile_err1。您能否详细解释一下,因为我是初学者。3)我们可以将此“compute_quantiles”方法用作pyspark 1.6中“approxQuantile”的替代品吗? - Valli69
  1. 好的,在尝试了代码并将其与approxQuantile获得的结果进行比较后,我的实验算法似乎是可比较的,并且可以用作approxQuantile的替代品。但是,如果您有pyspark 2.x可用,最好使用approxQuantile
- dekauliya
  1. 是的。
  2. 我想要实现的是计算每行在列中的百分位数,并将其归类到最近的分位数,这有点长,我会更新我的答案。
- dekauliya

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接