Pyspark - 两个数据框之间的差异 - 识别插入、更新和删除。

3

我有两个数据框 df1(旧) 和 df2(新)。我想比较 df2 和 df1 并找出新增的行、删除的行、更新的行以及被更新的列的名称。

以下是我编写的代码:

from pyspark.sql.functions import col, array, when, array_remove, lit

data1 = [("James","rob","Smith","36636","M",3000),
    ("Michael","Rose","jim","40288","M",4000),
    ("Robert","dunkin","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","60563","F",-1)
  ]

data2 = [("James","rob","Smith","36636","M",3000),
    ("Robert","dunkin","Williams","42114","M",2000),
    ("Maria","Anne","Jones","72712","F",3000),
    ("Yesh","Reddy","Brown","75234","M",3000),
    ("Jen","Mary","Brown","60563","F",-1)
  ]
  schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df1 = spark.createDataFrame(data=data1,schema=schema)
df2 = spark.createDataFrame(data=data2,schema=schema)
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c not in ['firstname','middlename','lastname']]

select_expr =[
                col("firstname"),col("middlename"),col("lastname"),
                *[df2[c] for c in df2.columns if c not in ['firstname','middlename','lastname']], 
                array_remove(array(*conditions_), "").alias("updated_columns")
]

df1.join(df2, ["firstname","middlename","lastname"],"inner").select(*select_expr).show()

这是我得到的输出。
+---------+----------+--------+-----+------+------+---------------+
|firstname|middlename|lastname|   id|gender|salary|updated_columns|
+---------+----------+--------+-----+------+------+---------------+
|    James|       rob|   Smith|36636|     M|  3000|             []|
|   Robert|    dunkin|Williams|42114|     M|  2000|       [salary]|
|    Maria|      Anne|   Jones|72712|     F|  3000|   [id, salary]|
|      Jen|      Mary|   Brown|60563|     F|    -1|             []|
+---------+----------+--------+-----+------+------+---------------+

这里是我期望得到的输出结果

+---------+----------+--------+-----+------+------+---------------+-----------------+
|firstname|middlename|lastname|   id|gender|salary|updated_columns|           status|
+---------+----------+--------+-----+------+------+---------------+-----------------+
|    James|       rob|   Smith|36636|     M|  3000|             []|        unchanged|
|   Robert|    dunkin|Williams|42114|     M|  2000|       [salary]|          updated|
|  Michael|      Rose|     jim|40288|     M|  4000|             []|          deleted|
|    Maria|      Anne|   Jones|72712|     F|  3000|   [id, salary]|          updated|
|     Yesh|     Reddy|   Brown|75234|     M|  3000|             []|            added|
|      Jen|      Mary|   Brown|60563|     F|    -1|             []|        unchanged|
+---------+----------+--------+-----+------+------+---------------+-----------------+

我知道可以使用左反 join 分别查找添加和删除的行。但是,我正在寻找更新现有 join 的方法,以获得上述输出。


你可能想要使用dr1.subtract(dr2)来获取差异。我相信从那里开始你想要的就在手边了。你还可以使用hdfs/ranger审计日志来获取这些信息。这只是给你另一个选择。 - Matt Andruff
2个回答

5

使用外连接可以帮助解决您的问题。我已经修改了您提供的代码,包括状态列。

最小化可行示例

from pyspark.sql.functions import col, array, when, array_remove, lit, size, coalesce
from pyspark.sql.types import *

data1 = [("James","rob","Smith","36636","M",3000),
    ("Michael","Rose","jim","40288","M",4000),
    ("Robert","dunkin","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","60563","F",-1)
  ]

data2 = [("James","rob","Smith","36636","M",3000),
    ("Robert","dunkin","Williams","42114","M",2000),
    ("Maria","Anne","Jones","72712","F",3000),
    ("Yesh","Reddy","Brown","75234","M",3000),
    ("Jen","Mary","Brown","60563","F",-1)
  ]
schema = StructType([
    StructField("firstname",StringType(),True),
    StructField("middlename",StringType(),True),
    StructField("lastname",StringType(),True),
    StructField("id", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("salary", IntegerType(), True)
  ])
 
df1 = spark.createDataFrame(data=data1,schema=schema)
df2 = spark.createDataFrame(data=data2,schema=schema)
conditions_ = [when(df1[c]!=df2[c], lit(c)).otherwise("") for c in df1.columns if c not in ['firstname','middlename','lastname']]


逻辑用于“status”列,并将修改后的“select_expr”更改为从“df2”和“df1”中合并值的“coalesce”,偏好给予“df2”以获取最新的数据更新。
status = when(df1["id"].isNull(), lit("added")).when(df2["id"].isNull(), lit("deleted")).when(size(array_remove(array(*conditions_), "")) > 0, lit("updated")).otherwise("unchanged")

select_expr =[
                col("firstname"),col("middlename"),col("lastname"),
                *[coalesce(df2[c], df1[c]).alias(c) for c in df2.columns if c not in ['firstname','middlename','lastname']],                
                array_remove(array(*conditions_), "").alias("updated_columns"),
                status.alias("status"),
]

最后,应用 outer join
df1.join(df2, ["firstname","middlename","lastname"],"outer").select(*select_expr).show()

输出

+---------+----------+--------+-----+------+------+---------------+---------+
|firstname|middlename|lastname|   id|gender|salary|updated_columns|   status|
+---------+----------+--------+-----+------+------+---------------+---------+
|    James|       rob|   Smith|36636|     M|  3000|             []|unchanged|
|      Jen|      Mary|   Brown|60563|     F|    -1|             []|unchanged|
|    Maria|      Anne|   Jones|72712|     F|  3000|   [id, salary]|  updated|
|  Michael|      Rose|     jim|40288|     M|  4000|             []|  deleted|
|   Robert|    dunkin|Williams|42114|     M|  2000|       [salary]|  updated|
|     Yesh|     Reddy|   Brown|75234|     M|  3000|             []|    added|
+---------+----------+--------+-----+------+------+---------------+---------+

有没有一种方法可以不使用“id”列来查找状态? - yAsH
@yAsH,您可以使用联合条件,该条件使用连接条件中使用的列。因此条件将是 when(df1["firstname"].isNull() & df1["middlename"].isNull() & df1["lastname"].isNull(), lit("added")) 等等。 - Nithish

0

我建议使用Ranger,这样你就可以捕捉到实际发生的变化和时间。但如果你只有这些数据框...

你想要进行"outer"连接(将两个表中的所有数据合并为一个)。你已经有了更新列的逻辑。

对于状态:"deleted"(在df1中但不在df2中)和"additions"(在df2中但不在df1中),(如果有更新列)--> "updated",否则为"unchanged"。


什么是Ranger? - Steven
Ranger是Hadoop的安全框架:https://ranger.apache.org/。它跟踪谁在何时访问了什么。还有其他方法可以通过查看Spark日志来追踪谁在何时做了什么,但Ranger更加简洁。 - Matt Andruff

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接