如何在PySpark数据框中添加一列,该列包含数据框中另一列的第n分位数

4
我有一个非常大的CSV文件,已经作为PySpark dataframe导入:df。数据框包含许多列,包括列ireturn。我想计算该列的0.99和0.01百分位数,然后将另一列添加到dataframe df中,如new_col_99new_col_01,它们分别包含0.99和0.01百分位数。我编写了以下代码,在小型数据框上运行良好,但在应用于大型数据框时会出现错误。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("name of the file", inferSchema = True, header = True)

precentile_99 = df.selectExpr('percentile(val1, 0.99)').head(1)[0][0]
precentile_01 = df.selectExpr('percentile(val1, 0.01)').head(1)[0][0]
from pyspark.sql.functions import lit
df = df.withColumn("new_col_99", lit(precentile_99))
df = df.withColumn("new_col_01", lit(precentile_01))

我试图用collect替换head,但也没有起作用。我收到了以下错误信息:

记录错误 ---
错误:py4j.java_gateway:尝试连接Java服务器(127.0.0.1:49850)时发生错误
跟踪(最近的调用):...

我还尝试了以下方法:
percentile = df.approxQuantile('ireturn',[0.01,0.99],0.25)
df = df.withColumn("new_col_01", lit(percentile[0]))
df = df.withColumn("new_col_99", lit(percentile[1]))

以上代码运行大约需要15-20分钟,但结果是错误的(我的ireturn列上的数据小于1,但它将0.99百分位数返回为6789....)


你具体遇到了什么错误?如果原因是内存不足,你可以尝试使用percentile_approx - martinarroyo
我更新了我的回答。你能否举一个例子使用percentile_approx函数?我不确定它的格式是什么。 - Monirrad
它具有与“percentile”相同的语法。 - martinarroyo
1个回答

0
晚了一些,但希望能解决您的问题。您可以通过以下方式获得结果:
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv("name of the file", inferSchema = True, header = True)

df = df.withColumn("new_col_99", F.expr('percentile(val1, 0.99) over()'))
df = df.withColumn("new_col_01", F.expr('percentile(val1, 0.01) over()'))

对于大型数据集,percentile_approx 可能更好:

df = df.withColumn("new_col_99", F.expr('percentile_approx(val1, 0.99) over()'))
df = df.withColumn("new_col_01", F.expr('percentile_approx(val1, 0.01) over()'))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接