假设我有一个事件的数据框,其中每一行之间的时间差都不同。主要规则是,如果事件与前一个或后一个事件之间的时间差小于5分钟,则只计算为一个访问量:
+--------+-------------------+--------+
|userid |eventtime |timeDiff|
+--------+-------------------+--------+
|37397e29|2017-06-04 03:00:00|60 |
|37397e29|2017-06-04 03:01:00|60 |
|37397e29|2017-06-04 03:02:00|60 |
|37397e29|2017-06-04 03:03:00|180 |
|37397e29|2017-06-04 03:06:00|60 |
|37397e29|2017-06-04 03:07:00|420 |
|37397e29|2017-06-04 03:14:00|60 |
|37397e29|2017-06-04 03:15:00|1140 |
|37397e29|2017-06-04 03:34:00|540 |
|37397e29|2017-06-04 03:53:00|540 |
+--------+----------------- -+--------+
挑战在于按照最新事件时间在5分钟内的开始时间和结束时间进行分组。输出结果应该像这个表格:
+--------+-------------------+--------------------+-----------+
|userid |start_time |end_time |events |
+--------+-------------------+--------------------+-----------+
|37397e29|2017-06-04 03:00:00|2017-06-04 03:07:00 |6 |
|37397e29|2017-06-04 03:14:00|2017-06-04 03:15:00 |2 |
+--------+-------------------+--------------------+-----------+
到目前为止,我已经使用了窗口滞后函数和一些条件,但是,我不知道接下来该怎么做:
%spark.pyspark
from pyspark.sql import functions as F
from pyspark.sql import Window as W
from pyspark.sql.functions import col
windowSpec = W.partitionBy(result_poi["userid"], result_poi["unique_reference_number"]).orderBy(result_poi["eventtime"])
windowSpecDesc = W.partitionBy(result_poi["userid"], result_poi["unique_reference_number"]).orderBy(result_poi["eventtime"].desc())
# The windows are between the current row and following row. e.g: 3:00pm and 3:03pm
nextEventTime = F.lag(col("eventtime"), -1).over(windowSpec)
# The windows are between the current row and following row. e.g: 3:00pm and 3:03pm
previousEventTime = F.lag(col("eventtime"), 1).over(windowSpec)
diffEventTime = nextEventTime - col("eventtime")
nextTimeDiff = F.coalesce((F.unix_timestamp(nextEventTime)
- F.unix_timestamp('eventtime')), F.lit(0))
previousTimeDiff = F.coalesce((F.unix_timestamp('eventtime') -F.unix_timestamp(previousEventTime)), F.lit(0))
# Check if the next POI is the equal to the current POI and has a time differnce less than 5 minutes.
validation = F.coalesce(( (nextTimeDiff < 300) | (previousTimeDiff < 300) ), F.lit(False))
# Change True to 1
visitCheck = F.coalesce((validation == True).cast("int"), F.lit(1))
result_poi.withColumn("visit_check", visitCheck).withColumn("nextTimeDiff", nextTimeDiff).select("userid", "eventtime", "nextTimeDiff", "visit_check").orderBy("eventtime")
我的问题是:这是可行的方法吗?如果是,我如何“向前”并查看满足5分钟条件的最大事件时间。据我所知,迭代Spark SQL列的值是可能的吗?这不会太昂贵吗?是否有其他方法来实现此结果?
@Aku建议的解决方案结果:
+--------+--------+---------------------+---------------------+------+
|userid |subgroup|start_time |end_time |events|
+--------+--------+--------+------------+---------------------+------+
|37397e29|0 |2017-06-04 03:00:00.0|2017-06-04 03:06:00.0|5 |
|37397e29|1 |2017-06-04 03:07:00.0|2017-06-04 03:14:00.0|2 |
|37397e29|2 |2017-06-04 03:15:00.0|2017-06-04 03:15:00.0|1 |
|37397e29|3 |2017-06-04 03:34:00.0|2017-06-04 03:43:00.0|2 |
+------------------------------------+-----------------------+-------+
它没有给出预期的结果。3:07 - 3:14和03:34-03:43被计算为5分钟内的范围,这不应该是这样的。此外,3:07应该是第一行的结束时间,因为它在前一行3:06的5分钟之内。