将Spark DataFrame模式转换为新模式

5
我有多个Spark作业,它们从不同的数据源读取,它们具有不同的模式,但非常接近,我想做的是将它们全部写入相同的Redshift表中,因此我需要统一所有DataFrame的模式,最好的方法是什么?
比如说第一个输入数据的架构是这样的:
  val schema1 = StructType(Seq(
    StructField("date", DateType),
    StructField("campaign_id", StringType),
    StructField("campaign_name", StringType),
    StructField("platform", StringType),
    StructField("country", StringType),
    StructField("views", DoubleType),
    StructField("installs", DoubleType),
    StructField("spend", DoubleType)
  ))

第二个输入源的模式如下:
  val schema2 = StructType(Seq(
    StructField("date", DateType),
    StructField("creator_id", StringType),
    StructField("creator_name", StringType),
    StructField("platform", StringType),
    StructField("views", DoubleType),
    StructField("installs", DoubleType),
    StructField("spend", DoubleType),
    StructField("ecpm", DoubleType)
  ))

表格模式(预期统一数据框):

  val finalSchema = StructType(Seq(
    StructField("date", DateType),
    StructField("account_name", StringType),
    StructField("adset_id", StringType),
    StructField("adset_name", StringType),
    StructField("campaign_id", StringType),
    StructField("campaign_name", StringType),
    StructField("pub_id", StringType),
    StructField("pub_name", StringType),
    StructField("creative_id", StringType),
    StructField("creative_name", StringType),
    StructField("platform", StringType),
    StructField("install_source", StringType),
    StructField("views", IntegerType),
    StructField("clicks", IntegerType),
    StructField("installs", IntegerType),
    StructField("cost", DoubleType)
  ))

正如您在最终架构中所看到的那样,我有一些列可能不在输入架构中,因此应为null,某些列名称也应更改。而像ecpm这样的某些列应该被删除。
2个回答

0
index列添加到两个dataframes中,并基于index进行join,以便进行一对一的映射。之后,从joineddataframe中仅select所需的columns
  1. If you have two dataframes like below

    // df1.show
    +-----+---+
    | name|age|
    +-----+---+
    |Alice| 25|
    |  Bob| 29|
    |  Tom| 26|
    +-----+---+
    
    //df2.show
    +--------+-------+
    |    city|country|
    +--------+-------+
    |   Delhi|  India|
    |New York|    USA|
    |  London|     UK|
    +--------+-------+
    
  2. Now add index columns and get one-to-one mapping

    import org.apache.spark.sql.functions._
    
    val df1Index=df1.withColumn("index1",monotonicallyIncreasingId)
    
    val df2Index=df2.withColumn("index2",monotonicallyIncreasingId)
    
    val joinedDf=df1Index.join(df2Index,df1Index("index1")===df2Index("index2"))
    
    //joinedDf
    
    +-----+---+------+--------+-------+------+
    | name|age|index1|    city|country|index2|
    +-----+---+------+--------+-------+------+
    |Alice| 25|     0|   Delhi|  India|     0|
    |  Bob| 29|     1|New York|    USA|     1|
    |  Tom| 26|     2|  London|     UK|     2|
    +-----+---+------+--------+-------+------+
    
现在您可以像下面这样编写查询:
val queryList=List(col("name"),col("age"),col("country"))
joinedDf.select(queryList:_*).show

//Output df
+-----+---+-------+
| name|age|country|
+-----+---+-------+
|Alice| 25|  India|
|  Bob| 29|    USA|
|  Tom| 26|     UK|
+-----+---+-------+

你怎么知道它们是相关的? - thebluephantom
1
@thebluephantom 他没有提供数据,但需要实现一个连接以将两个数据框中的数据合并。我进行了一对一的映射。他也可以在共同列(如日期等)上应用连接。 - Manoj Kumar Dhakad
它们是独立的任务,我无法将DataFrames合并,每个任务都有自己的模式,但应该将统一版本写入数据库。 - Am1rr3zA

0

不确定是否有完全自动化的方法来实现这一点。如果您的模式是固定且不特别复杂的,您可以手动调整模式并使用 union 合并结果。

为了举例说明,假设您想从 frame1 中包含列 col1col2,并包括 frame2col2col4

import org.apache.spark.sql.functions._

val subset1 = frame1.select($"col1", $"col2", lit(null).as("col4"))
val subset2 = frame2.select(lit(null).as("col1"), $"col2", $"col4")
val result = subset1 union subset2

就是这样实现的。我们手动指定每个列,因此可以跳过任何不需要的列。


你的方法是我现在正在做的,但我希望能找到更好的方法。 - Am1rr3zA

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接