使用Pyspark DataFrame删除具有旧时间戳的重复值

3

我有一个Pyspark数据框,其中包含starttime和stoptime列以及其他值会被更新的列。

|startime  |stoptime  |hour  |minute  |sec  |sip          |dip            |sport|dport|proto|pkt |byt |
|1504766585|1504801216|16    |20      |16   |192.168.0.11 |23.204.108.58  |51249|80   |6    |0   |0   |
|1504766585|1504801216|16    |20      |16   |192.168.0.11 |23.204.108.58  |51249|80   |6    |0   |0   |
|1504781751|1504801216|16    |20      |16   |192.168.0.11 |23.72.38.96    |51252|80   |6    |0   |0   |
|1504781751|1504801216|16    |20      |16   |192.168.0.11 |23.72.38.96    |51252|80   |6    |0   |0   |
|1504766585|1504801336|16    |22      |16   |192.168.0.11 |23.204.108.58  |51249|80   |6    |0   |0   |
|1504766585|1504801336|16    |22      |16   |192.168.0.11 |23.204.108.58  |51249|80   |6    |0   |0   |
|1504781751|1504801336|16    |22      |16   |192.168.0.11 |23.72.38.96    |51252|80   |6    |0   |0   |
|1504781751|1504801336|16    |22      |16   |192.168.0.11 |23.72.38.96    |51252|80   |6    |0   |0   |

在这个例子中,我想选择具有最新停止时间的所有行,所有其他列的值都是重复的。
2个回答

4

我猜你想要为每个sport保留最新的记录。你应该使用窗口函数来确定每个分区的最新记录:

import pyspark.sql.functions as psf
from pyspark.sql import Window
w = Window.partitionBy("sport").orderBy(psf.desc("stoptime"))

df.withColumn("rn", psf.row_number().over(w)).filter("rn = 1").drop("rn")

    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
    |  startime|  stoptime|hour|min|sec|         sip|          dip|sport|dport|proto|pkt|byt|
    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
    |1504781751|1504801336|  16| 22| 16|192.168.0.11|  23.72.38.96|51252|   80|    6|  0|  0|
    |1504766585|1504801336|  16| 22| 16|192.168.0.11|23.204.108.58|51249|   80|    6|  0|  0|
    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+

如果您对运动进行了不同的分区,您将得到与不同分区数量相同的记录。

如果您想获得整个表的最新停止时间而不进行分区,您可以删除分区并改用密集排名(相同值将具有相同的排名):

w = Window.orderBy(psf.desc("stoptime"))

df.withColumn("rn", psf.dense_rank().over(w)).filter("rn = 1").drop("rn").show()

    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
    |  startime|  stoptime|hour|min|sec|         sip|          dip|sport|dport|proto|pkt|byt|
    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
    |1504766585|1504801336|  16| 22| 16|192.168.0.11|23.204.108.58|51249|   80|    6|  0|  0|
    |1504766585|1504801336|  16| 22| 16|192.168.0.11|23.204.108.58|51249|   80|    6|  0|  0|
    |1504781751|1504801336|  16| 22| 16|192.168.0.11|  23.72.38.96|51252|   80|    6|  0|  0|
    |1504781751|1504801336|  16| 22| 16|192.168.0.11|  23.72.38.96|51252|   80|    6|  0|  0|
    +----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+

我想要相同的“startime、hour、min、sec、sip、dip、sport、dport、proto”行,但具有最新的 'stoptime'。这反过来将给我最大的byt和pkt。 - user2825083
1
如果您需要每个不同的startime,hour,min,sec,sip,dip,sport,dport,proto的最大stoptime值,则只需将所有这些列放入partitionBy中。 - MaFF
1
这段代码看起来正确吗?import pyspark.sql.functions as psf from pyspark.sql import Window w = Window.partitionBy("startime","hour","sip","dip","sport","dport","proto").orderBy(psf.desc("stoptime")) df = dataframe.withColumn("rn", psf.row_number().over(w)).filter("rn = 1").drop("rn") df.show() - user2825083
看起来没问题,不过请检查输出结果以确认是否符合您的期望。 - MaFF

0
from pyspark.sql.functions import col

df = sc.parallelize([(1504766585,1504801216,16,20,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
                     (1504766585,1504801216,16,20,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
                     (1504781751,1504801216,16,20,16,'192.168.0.11','23.72.38.96',  51252,80,6,0,0),
                     (1504781751,1504801216,16,20,16,'192.168.0.11','23.72.38.96',  51252,80,6,0,0),
                     (1504766585,1504801336,16,22,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
                     (1504766585,1504801336,16,22,16,'192.168.0.11','23.204.108.58',51249,80,6,0,0),
                     (1504781751,1504801336,16,22,16,'192.168.0.11','23.72.38.96',  51252,80,6,0,0),
                     (1504781751,1504801336,16,22,16,'192.168.0.11','23.72.38.96',  51252,80,6,0,0)]).\
    toDF(["startime","stoptime","hour","min","sec","sip","dip","sport","dport","proto","pkt","byt"])

df1 = df.where(col("stoptime") == df.select("stoptime").rdd.max()[0]).distinct()
df1.show()

输出为

+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
|  startime|  stoptime|hour|min|sec|         sip|          dip|sport|dport|proto|pkt|byt|
+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+
|1504766585|1504801336|  16| 22| 16|192.168.0.11|23.204.108.58|51249|   80|    6|  0|  0|
|1504781751|1504801336|  16| 22| 16|192.168.0.11|  23.72.38.96|51252|   80|    6|  0|  0|
+----------+----------+----+---+---+------------+-------------+-----+-----+-----+---+---+

@Marie,如果您不介意,能否再解释一下?我在这里没有找到任何非Spark对象使用? - Prem
@Marie 我认为我们应该始终讲述有意义的话,并准备好在发布任何理论假设之前进行辩护(例如,首先谈论浮点数,然后是整数,然后是RDD与DF)。难道我们不应该集中精力帮助社区成员,而不是这种对话交流吗? (顺便说一句 - 这个基准测试可能会引起您的兴趣)。请不要用那么多评论来恐吓OP。做一个体育好手!!! - Prem
哦,不,消息可能会误导人......我并不是有意贬低你,我只是想帮忙。我花了时间尝试为“float”找到可重现的例子,这是我必须为我的一个同事解决的问题,但我无法重现它。几天前,我也遇到了这个主题https://stackoverflow.com/questions/46122846/pyspark-inconsistency-in-converting-timestamp-to-integer-in-dataframe,所以我知道其他人遇到了这个转换问题。我只是建议如果可以避免就不要在Python和Spark之间反复切换。再次抱歉。 - MaFF
@user2825083 这个方法是可行的,但它会在“每个”分组中给出最大值行吗? - Prem
但是在您的原始帖子中,您的语气听起来不同。看起来像是错误的需求收集案例 :) - 我以为您只想要具有最新“停止时间”的行(无论组如何!!!)。我建议您正确更新问题并提供预期输出,以便其他用户在寻找解决方案时不会感到困惑。谢谢! - Prem
显示剩余7条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接