如何使用另一个dataframe中的新值更新pyspark dataframe?

4

I have two spark dataframes:

Dataframe A:

|col_1 | col_2 | ... | col_n |
|val_1 | val_2 | ... | val_n |

并且DataFrame B:

|col_1 | col_2 | ... | col_m |
|val_1 | val_2 | ... | val_m |

Dataframe B可能包含来自Dataframe A的重复、更新和新行。我想在Spark中编写一个操作,可以创建一个新的数据帧,其中包含来自Dataframe A以及Dataframe B中的更新和新行。

我首先创建了一个哈希列,其中只包含不可更新的列。这是唯一的ID。所以假设col1col2的值可以更改(可以更新),但col3,..,coln是唯一的。我已经创建了一个哈希函数hash(col3,..,coln)

A=A.withColumn("hash", hash(*[col(colname) for colname in unique_cols_A]))
B=B.withColumn("hash", hash(*[col(colname) for colname in unique_cols_B]))

现在我想编写一些Spark代码,基本上是从B中选择哈希不在A中的行(因此是新行和更新的行),并将它们与A中的行一起连接成一个新的数据框。如何在Pyspark中实现这一点?

编辑: 数据框B可以具有来自数据框A的额外列,因此联合不可行。

示例:

数据框A:

+-----+-----+
|col_1|col_2|
+-----+-----+
|    a|  www|
|    b|  eee|
|    c|  rrr|
+-----+-----+

数据帧B:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    d|  yyy|    2|
|    c|  rer|    3|
+-----+-----+-----+

结果:数据框 C:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    b|  eee| null|
|    c|  rer|    3|
|    d|  yyy|    2|
+-----+-----+-----+

你是否在寻找类似于这个答案的东西?与其使用哈希,更好的方法是使用唯一标识进行连接。 - pault
可能是更新数据框列的新值的重复问题。 - pault
这与那个答案不同,因为对我来说,我还需要保留来自数据帧B的新行。 - djWann
我需要一个哈希列,因为我没有唯一的ID列。 - djWann
你可以根据多个列进行连接,这应该相当于哈希算法,但是根据你的问题很难判断。你能否提供一些小样本输入/期望输出,并且能够重现的示例呢?请参考如何创建好的可重复Apache Spark DataFrame示例 - pault
显示剩余3条评论
3个回答

4
这与使用新值更新数据帧列密切相关,但您还想添加来自DataFrame B的行。一种方法是首先按链接问题中概述的方式操作,然后将结果与DataFrame B联合并删除重复项。

例如:

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
    .select(
        'col_1',
        f.when(
            ~f.isnull(f.col('b.col_2')),
            f.col('b.col_2')
        ).otherwise(f.col('a.col_2')).alias('col_2'),
        'b.col_3'
    )\
    .union(dfB)\
    .dropDuplicates()\
    .sort('col_1')\
    .show()
#+-----+-----+-----+
#|col_1|col_2|col_3|
#+-----+-----+-----+
#|    a|  wew|    1|
#|    b|  eee| null|
#|    c|  rer|    3|
#|    d|  yyy|    2|
#+-----+-----+-----+

如果你有很多要替换的列,而且不想硬编码它们,可以更通用地使用列表推导式:
cols_to_update = ['col_2']

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
    .select(
        *[
            ['col_1'] + 
            [
                f.when(
                    ~f.isnull(f.col('b.{}'.format(c))),
                    f.col('b.{}'.format(c))
                ).otherwise(f.col('a.{}'.format(c))).alias(c)
                for c in cols_to_update
            ] + 
            ['b.col_3']
        ]
    )\
    .union(dfB)\
    .dropDuplicates()\
    .sort('col_1')\
    .show()

这可以使用Coalesce简化吗? - Paul Brannan
@PaulBrannan 可能是这样。随意进行[编辑]或发布您自己的答案。 - pault

2
我会选择不同的解决方案,我认为它更简洁、更通用,不需要列出所有列。我会首先通过基于键列(列表)的内部连接来确定将要更新的dfA子集(replaceDf)。然后,我会从dfA中减去这个replaceDF,并将其与dfB合并。"Original Answer"可以翻译为"最初的回答"。
    replaceDf = dfA.alias('a').join(dfB.alias('b'), on=keyCols, how='inner').select('a.*')
    resultDf = dfA.subtract(replaceDf).union(dfB).show()

尽管dfA和dfB中会有不同的列,但是您仍然可以通过获取两个数据帧的列列表并找到它们的并集来克服这一点。然后我会准备一个选择查询(而不是“select.('a.')*”),以便我只列出存在于dfB中的dfA列+“null as colname”对于那些不存在于dfB中的列。"最初的回答"

0
如果您想保留唯一值并要求严格正确的结果,则应使用union后跟dropDuplicates来完成操作:
columns_which_dont_change = [...]
old_df.union(new_df).dropDuplicates(subset=columns_which_dont_change)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接