合并数据框并选择最新的记录。

3

I have 2 dataframes.

df1:

|Timestamp                        |ProjectId|AusID|Version|
+---------------------------------+---------+-------------+
|2017-09-19 16:57:36.000642 +02:00|20034    |529  |2017   |
|2017-09-19 16:58:32.000642 +02:00|20035    |973  |2017   |
|2017-09-21 12:51:36.000642 +02:00|20034    |521  |2017   |
|2017-09-22 17:58:36.000642 +02:00|20035    |543  |2017   |

df2:

|Timestamp                        |ProjectId|AusID|Version|
+---------------------------------+---------+-------------+
|2017-09-20 08:46:17.465000 Z     |20034    |513  |2017   |
|2017-09-20 08:46:17.465000 Z     |20035    |973  |2017   |
|2017-09-21 08:46:17.465000 Z     |20034    |521  |2017   |
|2017-09-22 08:46:17.465000 Z     |20035    |587  |2017   |

这些记录都是以百万计且包含更多列。我想合并这两个数据框,并使用AusID删除重复项,即当两个记录具有相同的AusID时,选择最新的记录(基于日期)并删除另一个记录。另一个问题是,日期在两个数据框中的格式也不相同。
我尝试使用以下方法:
df1.union(df2).except(df1.intersect(df2)).show()  

但是似乎它正在考虑所有列,如果有人能给一些提示就太好了。


1
将两个数据框联合起来,并使用窗口函数进行分组和选择最新的时间戳。 - koiralo
@ShankarKoirala 数据大小在几百GB。合并所有记录是一个好主意吗? - Waqar Ahmed
1
@WaqarAhmed Union操作不会打乱数据,因此我们不必担心性能问题。 - himanshuIIITian
1
好的,我会研究窗口函数和联合查询。谢谢。 - Waqar Ahmed
@himanshuIIITian 窗口函数或排序是否会进行洗牌? - Waqar Ahmed
1个回答

3
你可以考虑以下方法:

你可以考虑以下方法:

result = df1.unionAll(df2)
import org.apache.spark.sql.expressions._

val windowSpec = Window.partitionBy("ProjectId","AusID","Version").orderBy(col("Timestamp").asc)
val latestForEachKey = result.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank")
latestForEachKey.show(false)

谢谢。我们需要按除日期以外的所有列进行分区吗? - Waqar Ahmed
是的,我们必须按除日期外的所有列进行分区。 - Mugdha
我可以问一下这背后的直觉吗? - Waqar Ahmed
以上任何操作都会进行洗牌吗? - Waqar Ahmed
不,没有洗牌。 - Mugdha
但是在Spark UI中,我可以看到有一个shuffledRDD,我不知道它从哪里来。 - Waqar Ahmed

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接