无论是 Pandas 还是 PySpark 中的解决方案都可以,我对逻辑很感兴趣。
我有两个数据框:
df_1 =
id_1 id_2 value
ABC XYZ AA+
ABA XYY null
ABD YYZ null
ABD ZYZ A+
ABB XYY AA-
ACC XZY A--
BBB YYY null
df_2 =
id_1 id_2 value
ABC XYZ AA+
ABA XYY CCC
ABD YYZ UNDEF
ABD ZYZ A-
ABB XYY AA-
如何检查在df_2
中的每对id_1
和id_2
都存在于df_1
中并具有相同的value
?我想追踪问题行,找出不匹配的行:
expected_output =
id_1 id_2 value value_actual
ABA XYY null CCC
ABD YYZ null UNDEF
ABD ZYZ A+ A-
我一开始想要一个可以检查多个属性的结果,所以使自己过于复杂化,写了以下代码:
def matching_spark(df_1, df_2):
df_1 = df_1.withColumn('index', row_number().over(Window.partitionBy().orderBy('id_1','id_2')))
df_2 = df_2.withColumn('index', row_number().over(Window.partitionBy().orderBy('id_1','id_2')))
s = df_1.unionByName(df_2).orderBy('id_1','id_2','index')
s = s.withColumn('change',array('value'))
s = s.withColumn('cols',split(lit('value'),'\,'))
s= s.withColumn('change1',last('change').over(Window.partitionBy('index').orderBy('id_1','id_2'))).where(col('change')!=col('change1'))
s = s.withColumn('change2', expr("transform(change,(c,i)->change[i]!=change1[i])")).withColumn('faulty_attr',expr('filter(cols,(x,j)->(change2[j]))')).drop('index','change','cols','change1' ,'change2')
return s
但是没有输出任何内容,我相信对于我上面的例子,有一个简单的连接解决方案。