I have two spark dataframes:
Dataframe A:
|col_1 | col_2 | ... | col_n |
|val_1 | val_2 | ... | val_n |
并且DataFrame B:
|col_1 | col_2 | ... | col_m |
|val_1 | val_2 | ... | val_m |
Dataframe B可能包含来自Dataframe A的重复、更新和新行。我想在Spark中编写一个操作,可以创建一个新的数据帧,其中包含来自Dataframe A以及Dataframe B中的更新和新行。
我首先创建了一个哈希列,其中只包含不可更新的列。这是唯一的ID。所以假设col1
和col2
的值可以更改(可以更新),但col3,..,coln
是唯一的。我已经创建了一个哈希函数hash(col3,..,coln)
:
A=A.withColumn("hash", hash(*[col(colname) for colname in unique_cols_A]))
B=B.withColumn("hash", hash(*[col(colname) for colname in unique_cols_B]))
现在我想编写一些Spark代码,基本上是从B中选择哈希不在A中的行(因此是新行和更新的行),并将它们与A中的行一起连接成一个新的数据框。如何在Pyspark中实现这一点?
编辑: 数据框B可以具有来自数据框A的额外列,因此联合不可行。
示例:
数据框A:
+-----+-----+
|col_1|col_2|
+-----+-----+
| a| www|
| b| eee|
| c| rrr|
+-----+-----+
数据帧B:
+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
| a| wew| 1|
| d| yyy| 2|
| c| rer| 3|
+-----+-----+-----+
结果:数据框 C:
+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
| a| wew| 1|
| b| eee| null|
| c| rer| 3|
| d| yyy| 2|
+-----+-----+-----+