collect_list 保持顺序(SQL/Spark Scala)

3

我有这样一个表:

Clients   City   Timestamp
1         NY        0
1         WDC       10
1         NY        11    
2         NY        20
2         WDC       15

我想要的输出是根据时间戳收集所有城市(每个时间戳每个用户只有一个唯一的城市)。但是不显示时间戳。最终列表必须只按顺序包含城市。因此,对于该示例,它会得到以下结果:

Clients   my_list   Timestamp
1         NY - WDC - NY
2         WDC - NY

也许,我应该使用时间戳生成一个列表。然后在该列表中删除时间戳。我不知道......
我正在使用Scala的Spark SQL。因此,我尝试在SQL或Scala中使用collect_list,但似乎在使用之后我们失去了排序。
你能帮我解决这个问题吗?

对于客户端2,我认为您已经错误地发布了城市订单。您想按时间戳的递增顺序进行排序,对吗@salamanka44? - apnith
@apnith 当然可以。我想要按时间戳的递增顺序排列。我刚刚纠正了我的错误。对于错误感到抱歉! - salamanka44
3个回答

3
我只需要做以下几点:
val a = Seq((1,"NY",0),(1,"WDC",10),(1,"NY",11),(2,"NY",20),(2,"WDC",15))
    .toDF("client", "city", "timestamp")

val w = Window.partitionBy($"client").orderBy($"timestamp")
val b = a.withColumn("sorted_list", collect_list($"city").over(w))

在这里,我使用了Window函数,对客户端进行了分区并按时间戳进行排序。此时,您将获得如下的数据框:

+------+----+---------+-------------+
|client|city|timestamp|sorted_list  |
+------+----+---------+-------------+
|1     |NY  |0        |[NY]         |
|1     |WDC |10       |[NY, WDC]    |
|1     |NY  |11       |[NY, WDC, NY]|
|2     |WDC |15       |[WDC]        |
|2     |NY  |20       |[WDC, NY]    |
+------+----+---------+-------------+

在这里,你创建了一个新列sorted_list,其中包含按时间戳排序的值列表,但是每个客户端有重复的行。为了移除重复的行,请groupBy客户端并保留每个组中的最大值:

val c = b
        .groupBy($"client")
        .agg(max($"sorted_list").alias("sorted_timestamp"))
.show(false)

+------+----------------+
|client|sorted_timestamp|
+------+----------------+
|1     |[NY, WDC, NY]   |
|2     |[WDC, NY]       |
+------+----------------+


如果这个答案解决了你的问题,请接受它。 - apnith
我认为你没有很好地理解我的信息。我想要的最终输出是按时间戳排序的城市列表(基于时间戳)。而不是按时间戳排序(在最终输出中,我不会显示时间戳...)。现在你明白了吗? - salamanka44
@salamanka44,如果您仔细阅读/理解解决方案,我所发布的内容足以得出您需要的答案。我已在此更新了collect_list。其余部分保持不变。希望您已经掌握了这个概念。如果您仍然有任何困难,请告诉我。 - apnith

1
# below can be helpful for you to achieve your target
val input_rdd = spark.sparkContext.parallelize(List(("1","NY","0"),("1","WDC","10"),("1","NY","11"),("2","NY","20"),("2","WDC","15")))
val input_df = input_rdd.toDF("clients","city","Timestamp")
val winspec1 = Window.partitionBy($"clients").orderBy($"Timestamp")
val input_df1 = input_df.withColumn("collect", collect_list($"city").over(winspec1))
input_df1.show
Output:
+-------+----+---------+-------------+
|clients|city|Timestamp|      collect|
+-------+----+---------+-------------+
|      1|  NY|        0|         [NY]|
|      1| WDC|       10|    [NY, WDC]|
|      1|  NY|       11|[NY, WDC, NY]|
|      2| WDC|       15|        [WDC]|
|      2|  NY|       20|    [WDC, NY]|
+-------+----+---------+-------------+

val winspec2 = Window.partitionBy($"clients").orderBy($"Timestamp".desc)
input_df1.withColumn("number", row_number().over(winspec2)).filter($"number" === 1).drop($"number").drop($"Timestamp").drop($"city").show
Output:
+-------+-------------+
|clients|      collect|
+-------+-------------+
|      1|[NY, WDC, NY]|
|      2|    [WDC, NY]|

0

自 Spark 2.4 开始,您可以应用创建带有时间戳和城市的对象的第一个想法,将这些对象收集为列表,对列表进行排序,然后删除列表中每个对象的时间戳:

import org.apache.spark.sql.functions.{array_sort, col, collect_list, struct}

val result = inputDf.groupBy("Clients")
  .agg(
    array_sort(
      collect_list(
        struct(col("Timestamp"), col("City"))
      )
    ).getField("City").as("Cities")
  )

使用以下 inputDf 数据框:

+------+----+---------+
|Client|City|Timestamp|
+------+----+---------+
|1     |NY  |0        |
|1     |WDC |10       |
|1     |NY  |11       |
|2     |NY  |20       |
|2     |WDC |15       |
+------+----+---------+

您将获得以下“result”数据框:
+------+-------------+
|Client|Cities       |
+------+-------------+
|1     |[NY, WDC, NY]|
|2     |[WDC, NY]    |
+------+-------------+

使用这种方法,您将仅对输入数据框进行一次洗牌。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接