在pyspark中,我有一个数据框,其中行基于id和k1的值进行排序。此外,每行都分配了一个唯一的递增数字(rowid)。
对于每个唯一的id值,我想要计算当k1==k2时第一行的rowid与该id记录第一次出现的行的rowid + 1之间的差异,并将结果存储在一个新列(即rank)中。输出应该像下面这样。
例如,对于id = 1,在rowid = 2时,k1 == k2的值。第一次观察到id = 1是在rowid = 1时。将2-1 + 1 = 2放入排名列中。对于id = 3,我们没有任何记录与列k1和k2的值匹配。因此,用0(或null)填充排名列。我认为这涉及基于id的groupBy,但我不确定如何获得与列k1和k2相匹配的行对应的索引以及每个唯一id对应的第一个rowid。
-----------------------
rowid | id | k1 | k2 |
-----------------------
1 | 1 | v1 | l1 |
2 | 1 | v1 | v1 |
3 | 1 | v1 | l2 |
4 | 2 | v2 | v2 |
5 | 2 | v2 | l3 |
6 | 3 | v3 | l3 |
----------------------
对于每个唯一的id值,我想要计算当k1==k2时第一行的rowid与该id记录第一次出现的行的rowid + 1之间的差异,并将结果存储在一个新列(即rank)中。输出应该像下面这样。
----------------
id | k1 |rank |
-----------------
1 | v1 | 2 |
2 | v2 | 1 |
3 | v3 | 0 |
-----------------
例如,对于id = 1,在rowid = 2时,k1 == k2的值。第一次观察到id = 1是在rowid = 1时。将2-1 + 1 = 2放入排名列中。对于id = 3,我们没有任何记录与列k1和k2的值匹配。因此,用0(或null)填充排名列。我认为这涉及基于id的groupBy,但我不确定如何获得与列k1和k2相匹配的行对应的索引以及每个唯一id对应的第一个rowid。
df2 = df.groupBy("id").agg(fn.min("rowid").alias("minRowId")) rank = df.join(df2, df.id == df2.id, how='full').drop(df2.id) rank = rank.withColumn("diff", fn.when(fn.col("k1")==fn.col("k2"), rank.rowid - rank.minRowId + 1))
- user3192082udf
可能会比较慢,而join
则需要更多的内存空间。这取决于您的个人偏好。 - mayank agrawal