在pySpark中应用窗口函数来计算差异

33

我正在使用 pySpark,并已经设置了一个数据框,其中两列代表每日资产价格如下:

ind = sc.parallelize(range(1,5))
prices = sc.parallelize([33.3,31.1,51.2,21.3])
data = ind.zip(prices)
df = sqlCtx.createDataFrame(data,["day","price"])

我执行 df.show() 后得到:

+---+-----+
|day|price|
+---+-----+
|  1| 33.3|
|  2| 31.1|
|  3| 51.2|
|  4| 21.3|
+---+-----+

这都没问题。我想再加一列,包括价格每日收益,即像下面这样:

(price(day2)-price(day1))/(price(day1))

经过很多研究,我被告知最有效的方法是应用 pyspark.sql.window 函数,但我不知道如何操作。


我假设sqlCtx是等同于使用sc = SparkContext('local')获取的'spark'对象 spark = SparkSession(sc) - Nir
2个回答

60

您可以使用 lag 函数带入前一天的列,并添加额外的列来计算两列之间的实际日回报率。但是,您可能需要告诉 Spark 如何对数据进行分区和/或排序以执行 lag 操作,例如:

from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.functions import lit

dfu = df.withColumn('user', lit('tmoore'))

df_lag = dfu.withColumn('prev_day_price',
                        func.lag(dfu['price'])
                                 .over(Window.partitionBy("user")))

result = df_lag.withColumn('daily_return', 
          (df_lag['price'] - df_lag['prev_day_price']) / df_lag['price'] )

>>> result.show()
+---+-----+-------+--------------+--------------------+
|day|price|   user|prev_day_price|        daily_return|
+---+-----+-------+--------------+--------------------+
|  1| 33.3| tmoore|          null|                null|
|  2| 31.1| tmoore|          33.3|-0.07073954983922816|
|  3| 51.2| tmoore|          31.1|         0.392578125|
|  4| 21.3| tmoore|          51.2|  -1.403755868544601|
+---+-----+-------+--------------+--------------------+

这里是有关Spark中窗口函数的更长介绍


1
嗨。谢谢!那非常有用。顺便问一下,“lit”函数是做什么的? - Thomas Moore
2
lit - 创建一个字面值列 - https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.lit - Oleksiy
11
小提示:在使用 lag 函数时,按应用 lag 函数的列进行排序是一个很好的实践,例如 Window.partitionBy("user").orderBy("day", ascending=True)。 - Quetzalcoatl
1
评估 df_lag 时出现错误:窗口函数 lag(price#66, 1, null) 需要有序的窗口,dfu.withColumn('prev_day_price',func.lag(dfu['price']).over(Window.orderBy("user"))) 可以解决这个问题。 - Nir
如何使用Spark结构化流实现这一目标? - Nagesh
有人能建议如何使用Spark Streaming实现这个吗? - Nagesh

6
Lag函数可以帮助您解决使用案例。
from pyspark.sql.window import Window
import pyspark.sql.functions as func

### Defining the window 
Windowspec=Window.orderBy("day")

### Calculating lag of price at each day level
prev_day_price= df.withColumn('prev_day_price',
                        func.lag(dfu['price'])
                                .over(Windowspec))

### Calculating the average                                  
result = prev_day_price.withColumn('daily_return', 
          (prev_day_price['price'] - prev_day_price['prev_day_price']) / 
prev_day_price['price'] )

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接