我有一个Pyspark数据框,其中包含starttime和stoptime列以及其他值会被更新的列。
|startime |stoptime |hour |minute |sec |sip |dip |sport|dport|proto|pkt |byt |
|1504766585|1504801216|16 |20 |16 |192.168.0.11 |23.204.108.58 |51249|80 |6 |0 |0 |
|1504766585|1504801216|16 |20 |16 |192.168.0.11 |23.204.108.58 |51249|80 |6 |0 |0 |
|1504781751|1504801216|16 |20 |16 |192.168.0.11 |23.72.38.96 |51252|80 |6 |0 |0 |
|1504781751|1504801216|16 |20 |16 |192.168.0.11 |23.72.38.96 |51252|80 |6 |0 |0 |
|1504766585|1504801336|16 |22 |16 |192.168.0.11 |23.204.108.58 |51249|80 |6 |0 |0 |
|1504766585|1504801336|16 |22 |16 |192.168.0.11 |23.204.108.58 |51249|80 |6 |0 |0 |
|1504781751|1504801336|16 |22 |16 |192.168.0.11 |23.72.38.96 |51252|80 |6 |0 |0 |
|1504781751|1504801336|16 |22 |16 |192.168.0.11 |23.72.38.96 |51252|80 |6 |0 |0 |
在这个例子中,我想选择具有最新停止时间的所有行,所有其他列的值都是重复的。
startime,hour,min,sec,sip,dip,sport,dport,proto
的最大stoptime
值,则只需将所有这些列放入partitionBy
中。 - MaFFimport pyspark.sql.functions as psf from pyspark.sql import Window w = Window.partitionBy("startime","hour","sip","dip","sport","dport","proto").orderBy(psf.desc("stoptime")) df = dataframe.withColumn("rn", psf.row_number().over(w)).filter("rn = 1").drop("rn") df.show()
- user2825083