我有一个pyspark数据框,包含以下示例行。我正在尝试在10分钟的时间跨度内获取最大平均值。我尝试使用窗口函数,但无法实现结果。
下面是我的数据框,包含30分钟的随机数据。我期望输出3行,每10分钟1行。
+-------------------+---------+
| event_time|avg_value|
+-------------------+---------+
|2019-12-29 00:01:00| 9.5|
|2019-12-29 00:02:00| 9.0|
|2019-12-29 00:04:00| 8.0|
|2019-12-29 00:06:00| 21.0|
|2019-12-29 00:08:00| 7.0|
|2019-12-29 00:11:00| 8.5|
|2019-12-29 00:12:00| 11.5|
|2019-12-29 00:14:00| 8.0|
|2019-12-29 00:16:00| 31.0|
|2019-12-29 00:18:00| 8.0|
|2019-12-29 00:21:00| 8.0|
|2019-12-29 00:22:00| 16.5|
|2019-12-29 00:24:00| 7.0|
|2019-12-29 00:26:00| 14.0|
|2019-12-29 00:28:00| 7.0|
+-------------------+---------+
我将使用以下代码进行此操作。
window_spec = Window.partitionBy('event_time').orderBy('event_time').rangeBetween(-60*10,0)
new_df = data.withColumn('rank', rank().over(window_spec))
new_df.show()
但是这段代码给了我以下错误:
pyspark.sql.utils.AnalysisException: 'Window Frame specifiedwindowframe(RangeFrame, -600, currentrow$()) must match the required frame specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$());'
我的期望输出是
+-------------------+---------+
| event_time|avg_value|
+-------------------+---------+
|2019-12-29 00:06:00| 21.0|
|2019-12-29 00:16:00| 31.0|
|2019-12-29 00:22:00| 16.5|
+-------------------+---------+
有人可以帮我解决这个问题吗?
谢谢。
高阶函数
filter
来遍历这两列的结构,然后获取我们所需的时间。看一下这些函数,它们非常有帮助。 - murtihash