PySpark:使用另一个dataframe插入或更新数据

7
我有两个数据框DF1和DF2。DF1是主要的,DF2是增量的。来自DF2的数据应该插入或用于更新DF1数据。
假设DF1的格式如下:
ID编号 | 开始日期 | 数量 | 天数 ---|---|---|--- 1 | 2016-01-01 | 4650 | 22 2 | 2016-01-02 | 3130 | 45 1 | 2016-01-03 | 4456 | 22 2 | 2016-01-15 | 1234 | 45
DF2包含以下内容:
ID编号 | 开始日期 | 数量 | 天数 ---|---|---|--- 1 | 2016-01-01 | 8650 | 52 2 | 2016-01-02 | 7130 | 65 1 | 2016-01-06 | 3456 | 20 2 | 2016-01-20 | 2345 | 19 3 | 2016-02-02 | 1345 | 19
我需要将两个数据框组合在一起,这样如果DF2的“ID编号”和“开始日期”与DF1匹配,则应替换为DF1中的数据;如果不匹配,则应将其插入到DF1中。"id_no"不是唯一的。
期望结果:
编号 起始日期 金额 天数
1 2016年01月01日 8650元 52天
2 2016年01月02日 7130元 65天
1 2016年01月03日 4456元 22天
2 2016年01月15日 1234元 45天
1 2016年01月06日 3456元 20天
2 2016年01月20日 2345元 19天
3 2016年02月02日 1345元 19天
3个回答

12

你可以根据 id_nostart_date 将两个数据框联接起来,然后使用 coalesce 函数将 amountdays 列与来自 df2 的列合并,df2 列在前:

import pyspark.sql.functions as f

df1.alias('a').join(
    df2.alias('b'), ['id_no', 'start_date'], how='outer'
).select('id_no', 'start_date', 
    f.coalesce('b.amount', 'a.amount').alias('amount'), 
    f.coalesce('b.days', 'a.days').alias('days')
).show()

+-----+----------+------+----+
|id_no|start_date|amount|days|
+-----+----------+------+----+
|    1|2016-01-06|  3456|  20|
|    2|2016-01-20|  2345|  19|
|    1|2016-01-03|  4456|  22|
|    3|2016-02-02|  1345|  19|
|    2|2016-01-15|  1234|  45|
|    1|2016-01-01|  8650|  52|
|    2|2016-01-02|  7130|  65|
+-----+----------+------+----+

如果您有更多的列:

cols = ['amount', 'days']

df1.alias('a').join(
    df2.alias('b'), ['id_no', 'start_date'], how='outer'
).select('id_no', 'start_date', 
    *(f.coalesce('b.' + col, 'a.' + col).alias(col) for col in cols)
).show()
+-----+----------+------+----+
|id_no|start_date|amount|days|
+-----+----------+------+----+
|    1|2016-01-06|  3456|  20|
|    2|2016-01-20|  2345|  19|
|    1|2016-01-03|  4456|  22|
|    3|2016-02-02|  1345|  19|
|    2|2016-01-15|  1234|  45|
|    1|2016-01-01|  8650|  52|
|    2|2016-01-02|  7130|  65|
+-----+----------+------+----+

谢谢您的回答。如果数据框包含25个以上的列,我应该对这25个列都使用coalesce函数吗? - navin
你可以通过循环遍历列列表,在select中使用*语法,coalesce df2df1来编程实现它。 - Psidom

1

union应该可以做到,如果两个dfs具有相同的结构。

from pyspark.sql import functions as F
grp_by = {'id_no', 'start_date'}
df = df2.union(df1)
df = df.groupby(*grp_by).agg(*[F.first(c).alias(c) for c in set(df.columns)-grp_by])
df.show()
#     +-----+----------+----+------+
#     |id_no|start_date|days|amount|
#     +-----+----------+----+------+
#     |    1|2016-01-06|  20|  3456|
#     |    2|2016-01-20|  19|  2345|
#     |    1|2016-01-03|  22|  4456|
#     |    3|2016-02-02|  19|  1345|
#     |    2|2016-01-15|  45|  1234|
#     |    1|2016-01-01|  52|  8650|
#     |    2|2016-01-02|  65|  7130|
#     +-----+----------+----+------+

0
我正在研究这个问题。看起来Spark 支持SQL的MERGE INTO,这对于这个任务应该很好。您只需要创建一个新的ID,它是id_no和start_date的联接即可。
MERGE INTO df1
USING df2
ON df1.new_id = df2.new_id
WHEN MATCHED THEN
  UPDATE SET df1.amount = df2.amount, df1.days = df2.days
WHEN NOT MATCHED
  THEN INSERT *

1
等一下,这只适用于 Delta 表,对吧?不适用于数据帧。 - OrganicMustard

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接