如何在pyspark中获取一列的第二高值?

5
我有一个PySpark DataFrame,想在对两列 CUSTOMER_IDADDRESS_ID 进行groupBy之后,获得 ORDERED_TIME (DateTime字段,格式为 yyyy-mm-dd)的第二个最高值。
一个客户可能与一个地址相关联多个订单,我想要获取一个 (customer,address) 对的第二个最近的订单。
我的方法是创建一个窗口并按照 CUSTOMER_IDADDRESS_ID 进行分区,并按照 ORDERED_TIME 排序。
sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(col('ORDERED_TIME').desc())

df2 = df2.withColumn("second_recent_order", (df2.select("ORDERED_TIME").collect()[1]).over(sorted_order_times))

然而,我遇到了一个错误,显示ValueError: 'over' is not in list

是否有任何建议解决这个问题的正确方法?

如果需要其他信息,请告诉我。

示例数据

+-----------+----------+-------------------+
|USER_ID    |ADDRESS_ID|       ORDER DATE  | 
+-----------+----------+-------------------+
|        100| 1000     |2021-01-02         |
|        100| 1000     |2021-01-14         |
|        100| 1000     |2021-01-03         |
|        100| 1000     |2021-01-04         |
|        101| 2000     |2020-05-07         |
|        101| 2000     |2021-04-14         |
+-----------+----------+-------------------+

预期输出

+-----------+----------+-------------------+-------------------+
|USER_ID    |ADDRESS_ID|       ORDER DATE  |second_recent_order
+-----------+----------+-------------------+-------------------+
|        100| 1000     |2021-01-02          |2021-01-04 
|        100| 1000     |2021-01-14          |2021-01-04 
|        100| 1000     |2021-01-03          |2021-01-04 
|        100| 1000     |2021-01-04          |2021-01-04 
|        101| 2000     |2020-05-07          |2020-05-07 
|        101| 2000     |2021-04-14          |2020-05-07 
+-----------+----------+-------------------+-------------------

请您提供一些样本数据以及期望的输出结果,谢谢。 - Ric S
抱歉错过了那个问题,我已经编辑了带有样本输入和输出的问题。 - Jitesh Malipeddi
4个回答

5

这里还有另一种方法。使用collect_list函数。

import pyspark.sql.functions as F
from pyspark.sql import Window


sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(F.col('ORDERED_TIME').desc()).rangeBetween(Window.unboundedPreceding,  Window.unboundedFollowing)
df2 = (
  df
  .withColumn("second_recent_order", (F.collect_list(F.col("ORDERED_TIME")).over(sorted_order_times))[1])
)
df2.show()

Final output


1
你可以在以下方式中使用window,但如果一组中只有一行,则会得到null。

sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(desc('ORDERED_TIME')).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df2 = df2.withColumn(
    "second_recent_order",
    collect_list("ORDERED_TIME").over(sorted_order_times).getItem(1)
)


0
一种解决方案是创建一个查找表,其中包含所有 CUSTOMER_IDADDRESS_ID 的第二个最近订单,并将其与原始数据框连接。
我假设您的 ORDERED_TIME 列已经是时间戳类型。
import pyspark.sql.functions as F
from pyspark.sql.window import Window

# define window
w = Window().partitionBy('CUSTOMER_ID', 'ADDRESS_ID').orderBy(F.desc('ORDERED_TIME'))

# create lookup table
second_highest = df \
  .withColumn('rank', F.dense_rank().over(w)) \
  .filter(F.col('rank') == 2) \
  .select('CUSTOMER_ID', 'ADDRESS_ID', 'ORDERED_TIME')

# join with original dataframe
df = df.join(second_highest, on=['CUSTOMER_ID', 'ADDRESS_ID'], how='left')

df.show()

+-----------+----------+-------------------+-------------------+
|CUSTOMER_ID|ADDRESS_ID|       ORDERED_TIME|       ORDERED_TIME|
+-----------+----------+-------------------+-------------------+
|        100| 158932441|2021-01-02 13:35:57|2021-01-04 09:36:10|
|        100| 158932441|2021-01-14 19:14:08|2021-01-04 09:36:10|
|        100| 158932441|2021-01-03 13:33:52|2021-01-04 09:36:10|
|        100| 158932441|2021-01-04 09:36:10|2021-01-04 09:36:10|
|        101| 281838494|2020-05-07 13:35:57|2020-05-07 13:35:57|
|        101| 281838494|2021-04-14 19:14:08|2020-05-07 13:35:57|
+-----------+----------+-------------------+-------------------+

注意:在您的期望输出中,您写下了CUSTOMER_ID == 1012021-04-14 19:14:08,但实际上应该是2020-05-07 13:35:57,因为它是在2020年。


0

可以使用两个窗口:排序窗口以正确顺序获取行,未排序的窗口结合“first”函数,以获取第二行(Scala):

val df2 = Seq(
  (100, 158932441, "2021-01-02 13:35:57"),
  (100, 158932441, "2021-01-14 19:14:08"),
  (100, 158932441, "2021-01-03 13:33:52"),
  (100, 158932441, "2021-01-04 09:36:10"),
  (101, 281838494, "2020-05-07 13:35:57"),
  (101, 281838494, "2021-04-14 19:14:08")
).toDF("CUSTOMER_ID", "ADDRESS_ID", "ORDERED_TIME")

val sorted_order_times = Window
  .partitionBy("CUSTOMER_ID", "ADDRESS_ID")
  .orderBy(desc("ORDERED_TIME"))

val unsorted_order_times = Window
  .partitionBy("CUSTOMER_ID", "ADDRESS_ID")

df2
  .withColumn("row_number", row_number().over(sorted_order_times))
  .withColumn("second_recent_order",
  first(
    when($"row_number" === lit(2), $"ORDERED_TIME").otherwise(null), true
  ).over(unsorted_order_times))
  .drop("row_number")

输出:

+-----------+----------+-------------------+-------------------+
|CUSTOMER_ID|ADDRESS_ID|ORDERED_TIME       |second_recent_order|
+-----------+----------+-------------------+-------------------+
|100        |158932441 |2021-01-14 19:14:08|2021-01-04 09:36:10|
|100        |158932441 |2021-01-04 09:36:10|2021-01-04 09:36:10|
|100        |158932441 |2021-01-03 13:33:52|2021-01-04 09:36:10|
|100        |158932441 |2021-01-02 13:35:57|2021-01-04 09:36:10|
|101        |281838494 |2021-04-14 19:14:08|2020-05-07 13:35:57|
|101        |281838494 |2020-05-07 13:35:57|2020-05-07 13:35:57|
+-----------+----------+-------------------+-------------------+

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接