使用Pyspark进行多列左外连接

3

我使用的是Pyspark 2.1.0版本。

我尝试使用以下代码对两个数据框进行左外连接:

我有两个数据框,它们的架构如下所示:

crimes
 |-- CRIME_ID: string (nullable = true)
 |-- YEAR_MTH: string (nullable = true)
 |-- CRIME_TYPE: string (nullable = true)
 |-- CURRENT_OUTCOME: string (nullable = true)

outcomes
 |-- CRIME_ID: string (nullable = true)
 |-- YEAR_MTH: string (nullable = true)
 |-- FINAL_OUTCOME: string (nullable = true)

我需要能够根据左外连接将犯罪与结果进行关联,因为一个犯罪可能对应多个结果。我希望排除两个数据框中共同的列。

我尝试了以下两种方法,但是每种方法都会产生各种错误:

cr_outs = crimes.join(outcomes, crimes.CRIME_ID == outcomes.CRIME_ID, 'left_outer')\
 .select(['crimes.'+c for c in crimes.columns] + ['outcomes.FINAL_OUTCOME'])

 from pyspark.sql.functions as fn    
 cr_outs = crimes.alias('a').join(outcomes.alias('b'), fn.col('b.CRIME_ID') = fn.col('a.CRIME_ID') ,'left_outer')\
  .select([fn.col('a.'+ c) for c in a.columns] + b.FINAL_OUTCOME)

有没有人能够建议一种替代方法? 谢谢。

你是否也想在连接中包括 YEAR_MTH 列?(连接...) - muon
一对多关系仅基于CRIME_ID。 - alortimor
3个回答

2

这个方法很有效,看起来你需要使用别名,类似之前发布的,不过在PySpark 2.1.0中稍微简单一些。

cr_outs = crimes.alias('a')\
  .join(outcomes, crimes.CRIME_ID == outcomes.CRIME_ID, 'left_outer')\
  .select(*[col('a.'+c) for c in crimes.columns] 
          + [outcomes.FINAL_OUTCOME])

cr_outs.show()
cr_outs.printSchema()

--------+-------------------+--------------------+--------------------+--------------------+
|            CRIME_ID|YEAR_MTH|         REPORTED_BY|        FALLS_WITHIN|LONGITUDE| LATITUDE|            LOCATION|LSOA_CODE|          LSOA_NAME|          CRIME_TYPE|     CURRENT_OUTCOME|       FINAL_OUTCOME|
+--------------------+--------+--------------------+--------------------+---------+---------+--------------------+---------+-------------------+--------------------+--------------------+--------------------+
|426085c2ed33af598...| 2017-01|City of London Po...|City of London Po...|-0.086051| 51.51357|On or near Finch ...|E01032739|City of London 001F|         Other theft|Investigation com...|Investigation com...|
|33a3ddb8160a854a4...| 2017-01|City of London Po...|City of London Po...|-0.077777|51.518047|On or near Sandy'...|E01032
..
..
..
root
 |-- CRIME_ID: string (nullable = true)
 |-- YEAR_MTH: string (nullable = true)
 |-- REPORTED_BY: string (nullable = true)
 |-- FALLS_WITHIN: string (nullable = true)
 |-- LONGITUDE: float (nullable = true)
 |-- LATITUDE: float (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- LSOA_CODE: string (nullable = true)
 |-- LSOA_NAME: string (nullable = true)
 |-- CRIME_TYPE: string (nullable = true)
 |-- CURRENT_OUTCOME: string (nullable = true)
 |-- FINAL_OUTCOME: string (nullable = true)

如您所见,与我的原始帖子相比,现在有更多的列,但没有重复的列,也没有重命名列 :-)


0

您可以使用以下函数删除重复项。

def dropDupeDfCols(df):
   newcols = []
   dupcols = []

for i in range(len(df.columns)):
    if df.columns[i] not in newcols:
        newcols.append(df.columns[i])
    else:
        dupcols.append(i)

df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
    df = df.drop(str(dupcol))

return df.toDF(*newcols)## Heading ##

0

您可以暂时重命名常规列以消除歧义。

crimes = crimes\
.withColumnRenamed('CRIME_ID','CRIME_ID_1')\
.withColumnRenamed('YEAR_MTH','YEAR_MTH_1)


required_columns = [c for c in crimes.columns] + ['FINAL_OUTCOME']

cr_outs = crimes\
.join(outcomes, crimes.CRIME_ID_1 == outcomes.CRIME_ID, 'left_outer')\
.select(required_columns)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接