这可能最容易通过示例来解释。假设我有一个网站用户登录的DataFrame,例如:
scala> df.show(5)
+----------------+----------+
| user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
| OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows
我想在表格中添加一列,用于指示用户何时成为了该网站的活跃用户。但是有一个限制条件:在一段时间内,用户才被认为是活跃用户,在此期间之后,如果他们再次登录,则他们的 became_active
日期将被重置。假设这段时间是5天。那么从上面的表格中得到的所需表格将类似于以下内容:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-11| 2012-01-11|
+----------------+----------+-------------+
因此,特别地,SirChillingtonIV的became_active
日期被重置,因为他们的第二次登录在活动期限过期之后,但是Booooooo99900098的became_active
日期在第二次登录时没有被重置,因为它在活动期间内。
我的初步想法是使用带有lag
的窗口函数,然后使用lag
值来填充became_active
列。例如,大致如下:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))
那么,填写became_active
日期的规则将是,如果tmp
为null
(即,如果这是第一次登录),或者如果login_date - tmp >= 5
,则 became_active = login_date
; 否则,转到tmp
中下一个最近的值,并应用相同的规则。这表明了一种递归方法,我无法想象如何实现。
我的问题是:这是可行的方法吗?如果是,我如何“返回”并查看tmp
之前的早期值,直到找到一个停止的值?据我所知,我不能迭代Spark SQL Column
的值。是否有另一种方法来实现此结果?
datediff($"login_date", lag($"login_date", 1).over(userWindow))
的结果为null
(在窗口的第一行),则返回0。 - zero323val sessionized = df.withColumn("session", sum(newSession).over(userWindow))
是如何增加计数的呢? - Sanchit Grover