通过搜索和条件找到两列值之间的差异

5
在pyspark中,我有一个数据框,其中行基于id和k1的值进行排序。此外,每行都分配了一个唯一的递增数字(rowid)。
-----------------------
rowid | id | k1  | k2 |
-----------------------
1     | 1  | v1 | l1  |
2     | 1  | v1 | v1  |
3     | 1  | v1 | l2  |
4     | 2  | v2 | v2  |
5     | 2  | v2 | l3  |
6     | 3  | v3 | l3  |
----------------------

对于每个唯一的id值,我想要计算当k1==k2时第一行的rowid与该id记录第一次出现的行的rowid + 1之间的差异,并将结果存储在一个新列(即rank)中。输出应该像下面这样。
----------------
 id | k1  |rank |
-----------------
 1  | v1  | 2   |
 2  | v2  | 1   |
 3  | v3  | 0   | 
-----------------

例如,对于id = 1,在rowid = 2时,k1 == k2的值。第一次观察到id = 1是在rowid = 1时。将2-1 + 1 = 2放入排名列中。对于id = 3,我们没有任何记录与列k1和k2的值匹配。因此,用0(或null)填充排名列。我认为这涉及基于id的groupBy,但我不确定如何获得与列k1和k2相匹配的行对应的索引以及每个唯一id对应的第一个rowid。
2个回答

1
首先创建一个示例数据框:
import pyspark.sql.functions as F
from pyspark.sql.types import *

df = sql.createDataFrame([
            (1, 1, 'v1' , 'l1'),
            (2, 1, 'v1' , 'v1'),
            (3, 1, 'v1' , 'l2'),
            (4, 2, 'v2' , 'v2'),
            (5, 2, 'v2' , 'l3'),
            (6, 3, 'v3' , 'l3'),
            ],[
            'rowid', 'id', 'k1', 'k2'])

创建一个UDF并将其应用于列,
def get_rank_udf(rows):
    rows = sorted(rows, key=lambda x: x['rowid'])
    first_row_id = rows[0]['rowid']
    for _r in rows:
        if _r['k1'] == _r['k2']:
            equal_row_id = _r['rowid']
            break
        else:
            equal_row_id = None

    if equal_row_id is None:
        return 0
    return equal_row_id - first_row_id + 1

get_rank = F.udf(lambda x: get_rank_udf(x), IntegerType())

df = df.groupby('id', 'k1').agg(F.collect_list(F.struct('rowid', 'k1', 'k2')).alias('elements'))\
       .withColumn('rank', get_rank(F.col('elements')))\
       .select('id', 'k1', 'rank')

这段文字的英译中文是:

这将输出:

+---+---+----+                                                                  
| id| k1|rank|
+---+---+----+
|  1| v1|   2|
|  2| v2|   1|
|  3| v3|   0|
+---+---+----+

谢谢,我也可以按照以下方式解决,但您的解决方案更优雅 :) 您知道哪个更有效率吗?我是说join vs udf?df2 = df.groupBy("id").agg(fn.min("rowid").alias("minRowId")) rank = df.join(df2, df.id == df2.id, how='full').drop(df2.id) rank = rank.withColumn("diff", fn.when(fn.col("k1")==fn.col("k2"), rank.rowid - rank.minRowId + 1)) - user3192082
2
udf 可能会比较慢,而 join 则需要更多的内存空间。这取决于您的个人偏好。 - mayank agrawal

1
你可以使用API函数并在idk1上使用groupBy,这比使用udf更快(请参考)
import pyspark.sql.functions as f

df.groupBy("id", "k1")\
    .agg(
        f.min(f.when(f.col("k1")==f.col("k2"), f.col("rowid"))).alias("first_equal"),
        f.min("rowid").alias("first_row")
    )\
    .select("id", "k1", (f.col("first_equal")-f.col("first_row")+1).alias("rank"))\
    .fillna(0)\
    .show()
#+---+---+----+
#| id| k1|rank|
#+---+---+----+
#|  1| v1|   2|
#|  2| v2|   1|
#|  3| v3|   0|
#+---+---+----+

计算rank可分为两个聚合步骤:

  • 第一个聚合步骤获取每个id, k1对应的最小rowid,其中满足k1==k2
  • 第二个聚合步骤获取每个id, k1对应的最小rowid

将这两个值的差(按照您的要求添加+1),并最后用0替换任何null值。


更新:使用 row_number 的另一种替代方法:
from pyspark.sql import Window

# you can define your own order by column
w = Window.partitionBy("id", "k1").orderBy("rowid")

df.withColumn("rank", f.when(f.expr("k1 = k2"), f.row_number().over(w)))\
    .groupBy("id", "k1")\
    .agg(f.min("rank"))\
    .fillna(0)\
    .show()
# Same as above

太好了!如果“rowid”列不可用,你有想过其他解决方法吗?我故意添加这个问题是因为我认为这会使问题更容易解决。 - user3192082
@user3192082 这是你要找的吗?我不确定我理解了你的意思。 - pault
我已经使用这个来为每行分配一个序列索引。我的意思是,如果我想避免这种方法(即数据框没有行ID列),我们仍然可以解决这个问题并计算排名吗? - user3192082
@user3192082,如果你有一种方法可以对每个分组内的行进行排序,那么就可以实现。根据您提供的数据,我没有看到明显的方法。例如,对于“id = 1”,为什么“l1”排在“v1”之前,而“v1”又排在“l2”之前?如果有一种方法可以对它们进行排序,您可能可以使用pyspark.sql.functions.rankpyspark.sql.functions.row_number - pault
1
@user3192082,我已经添加了一个示例来展示您如何执行此操作。您必须定义窗口函数以按适当的方式对行进行排序。在这里,我使用rowid,但您可以使用任何方法。 (请记住,Spark数据框架本质上是无序的,因此很难依赖数据在源系统中出现的顺序)。 - pault

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接