Pyspark带有条件的窗口函数

12

假设我有一个事件的数据框,其中每一行之间的时间差都不同。主要规则是,如果事件与前一个或后一个事件之间的时间差小于5分钟,则只计算为一个访问量:

+--------+-------------------+--------+
|userid  |eventtime          |timeDiff|
+--------+-------------------+--------+
|37397e29|2017-06-04 03:00:00|60      |
|37397e29|2017-06-04 03:01:00|60      |
|37397e29|2017-06-04 03:02:00|60      |
|37397e29|2017-06-04 03:03:00|180     |
|37397e29|2017-06-04 03:06:00|60      |
|37397e29|2017-06-04 03:07:00|420     |
|37397e29|2017-06-04 03:14:00|60      |
|37397e29|2017-06-04 03:15:00|1140    |
|37397e29|2017-06-04 03:34:00|540     |
|37397e29|2017-06-04 03:53:00|540     |
+--------+----------------- -+--------+

挑战在于按照最新事件时间在5分钟内的开始时间和结束时间进行分组。输出结果应该像这个表格:

+--------+-------------------+--------------------+-----------+
|userid  |start_time         |end_time            |events     |
+--------+-------------------+--------------------+-----------+
|37397e29|2017-06-04 03:00:00|2017-06-04 03:07:00 |6          |
|37397e29|2017-06-04 03:14:00|2017-06-04 03:15:00 |2          |
+--------+-------------------+--------------------+-----------+

到目前为止,我已经使用了窗口滞后函数和一些条件,但是,我不知道接下来该怎么做:

%spark.pyspark
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col

windowSpec = W.partitionBy(result_poi["userid"], result_poi["unique_reference_number"]).orderBy(result_poi["eventtime"])
windowSpecDesc = W.partitionBy(result_poi["userid"], result_poi["unique_reference_number"]).orderBy(result_poi["eventtime"].desc())

# The windows are between the current row and following row. e.g: 3:00pm and 3:03pm 
nextEventTime = F.lag(col("eventtime"), -1).over(windowSpec)

# The windows are between the current row and following row. e.g: 3:00pm and 3:03pm 
previousEventTime = F.lag(col("eventtime"), 1).over(windowSpec)
diffEventTime = nextEventTime - col("eventtime")

nextTimeDiff = F.coalesce((F.unix_timestamp(nextEventTime)
            - F.unix_timestamp('eventtime')), F.lit(0))
previousTimeDiff = F.coalesce((F.unix_timestamp('eventtime') -F.unix_timestamp(previousEventTime)), F.lit(0))


# Check if the next POI is the equal to the current POI and has a time differnce less than 5 minutes.
validation = F.coalesce(( (nextTimeDiff < 300) | (previousTimeDiff < 300) ), F.lit(False))

# Change True to 1
visitCheck = F.coalesce((validation == True).cast("int"), F.lit(1))


result_poi.withColumn("visit_check", visitCheck).withColumn("nextTimeDiff", nextTimeDiff).select("userid", "eventtime", "nextTimeDiff", "visit_check").orderBy("eventtime")

我的问题是:这是可行的方法吗?如果是,我如何“向前”并查看满足5分钟条件的最大事件时间。据我所知,迭代Spark SQL列的值是可能的吗?这不会太昂贵吗?是否有其他方法来实现此结果?

@Aku建议的解决方案结果:

+--------+--------+---------------------+---------------------+------+
|userid  |subgroup|start_time           |end_time             |events|
+--------+--------+--------+------------+---------------------+------+
|37397e29|0       |2017-06-04 03:00:00.0|2017-06-04 03:06:00.0|5     |
|37397e29|1       |2017-06-04 03:07:00.0|2017-06-04 03:14:00.0|2     |
|37397e29|2       |2017-06-04 03:15:00.0|2017-06-04 03:15:00.0|1     |
|37397e29|3       |2017-06-04 03:34:00.0|2017-06-04 03:43:00.0|2     |
+------------------------------------+-----------------------+-------+

它没有给出预期的结果。3:07 - 3:14和03:34-03:43被计算为5分钟内的范围,这不应该是这样的。此外,3:07应该是第一行的结束时间,因为它在前一行3:06的5分钟之内。

3个回答

14
你需要一个额外的窗口函数和一个groupby来实现这一点。我们想要的是将时间差大于300的每一行作为一组的结束和新组的开始。Aku的解决方案应该可以工作,只是指标标记了组的开始而不是结束。要更改这个,你需要进行累加和到n-1而不是n(n为当前行):
w = Window.partitionBy("userid").orderBy("eventtime")
DF = DF.withColumn("indicator", (DF.timeDiff > 300).cast("int"))
DF = DF.withColumn("subgroup", func.sum("indicator").over(w) - func.col("indicator"))
DF = DF.groupBy("subgroup").agg(
    func.min("eventtime").alias("start_time"), 
    func.max("eventtime").alias("end_time"),
    func.count("*").alias("events")
 )

+--------+-------------------+-------------------+------+
|subgroup|         start_time|           end_time|events|
+--------+-------------------+-------------------+------+
|       0|2017-06-04 03:00:00|2017-06-04 03:07:00|     6|
|       1|2017-06-04 03:14:00|2017-06-04 03:15:00|     2|
|       2|2017-06-04 03:34:00|2017-06-04 03:34:00|     1|
|       3|2017-06-04 03:53:00|2017-06-04 03:53:00|     1|
+--------+-------------------+-------------------+------+

看起来你也过滤掉了只有一个事件的行,因此:

DF = DF.filter("events != 1")

+--------+-------------------+-------------------+------+
|subgroup|         start_time|           end_time|events|
+--------+-------------------+-------------------+------+
|       0|2017-06-04 03:00:00|2017-06-04 03:07:00|     6|
|       1|2017-06-04 03:14:00|2017-06-04 03:15:00|     2|
+--------+-------------------+-------------------+------+

7

如果我理解正确,你想要在TimeDiff > 300时结束每个分组?这似乎可以使用滚动窗口函数轻松实现:

首先导入一些内容

from pyspark.sql.window import Window
import pyspark.sql.functions as func

然后设置窗口时,我假设您会按用户ID分区。
w = Window.partitionBy("userid").orderBy("eventtime")

然后通过首先标记每个组的第一个成员,然后对该列求和来确定每个观察值所属的子组。

indicator = (TimeDiff > 300).cast("integer")
subgroup = func.sum(indicator).over(w).alias("subgroup")

然后添加一些聚合函数,就可以完成了。
DF = DF.select("*", subgroup)\
.groupBy("subgroup")\
.agg(
    func.min("eventtime").alias("start_time"),
    func.max("eventtime").alias("end_time"),
    func.count(func.lit(1)).alias("events")
)

谢谢@Aku。它没有给出预期的结果。3:07 - 3:14和03:34-03:43被计算为5分钟内的范围,这不应该是这样的。我编辑了问题,并附上了您建议的解决方案的结果,以便您进行验证。 - ultraInstinct
所以你想让start_time和end_time相差不超过5分钟?但是根据"desired output"的例子(时间范围为3:00-3:07),这并不正确,所以我有些困惑。 - aku
是的,确切地说,开始时间和结束时间必须相差不超过5分钟。结束时间为3:07,因为它比前一个时间3:06相差不超过5分钟。 - ultraInstinct

0

可以采用根据时间线标准对数据框进行分组的方法。

您可以创建一个数据框,将行按照每5分钟的时间线进行拆分。 这些行是分组记录的标准, 并且这些行将为每个组设置开始时间和结束时间。

然后找到每个组的计数和最大时间戳(结束时间)。


感谢@Magic。我编辑了我的问题,并使用您的解决方案得出了与Aku类似的结果。 - ultraInstinct

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接