如何合并两个具有相同列数的数据框?

4

DataFrame df1 包含列:a,b,c,d,e(空数据集)

DataFrame df2 包含列:b,c,d,e,_c4(包含数据)

我想将这两个DataFrame联合起来。我尝试使用:

df1.union(df2);

这会将数据填充到位置。但我希望用列名填充数据。

然后我尝试了以下方法:

df1.unionByName(df2, allowMissingColumns= true);

但是在 ``allowMissingColumns= true` 时,会抛出错误。我知道这是由于版本原因造成的。我使用的Spark版本是2.4.4。

df1:

|a|b|c|d|e|
+---------+
| | | | | | 
+---------+

df2:

|b|c|d|e|_c4|
+-----------+
|2|3|5|6|   | 
+-----------+

预期输出:

|a|b|c|d|e|
+---------+
| |2|3|5|6| 
+---------+

我的问题是有没有其他方法使用列名覆盖一个空数据框(df1)以使用填充的数据框(df2)?或者我需要修改pom.xml文件中的版本号码?

请提供一些建议。

pom文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>rule</groupId>
  <artifactId>qwerty</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <name>qwerty</name>
  <description>code</description>
  <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.4</version>
        </dependency>

        
        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>3.1.2</version>
        </dependency>
        <dependency>
            <groupId>com.databricks</groupId>
            <artifactId>spark-avro_2.11</artifactId>
            <version>4.0.0</version>
        </dependency>

   </dependencies>
   <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <outputDirectory>${project.build.directory}</outputDirectory>
                    <archive>
                        <manifest>
                            <mainClass>qwerty.qwerty</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin> 
                <artifactId>maven-compiler-plugin</artifactId> 
                <configuration> <source>1.8</source> <target>1.8</target> </configuration> 
            </plugin>
        </plugins>
    </build>
</project>
1个回答

4

unionByName 自 Spark 2.3 就存在了,但 allowMissingColumns 只出现在 Spark 3.1 中,因此您在 2.4 版本中出现错误。

在 Spark 2.4 中,您可以尝试自己实现相同的行为。也就是说,转换 df2 使其包含来自 df1 的所有列。如果某个列不在 df2 中,则可以将其设置为空值。在 scala 中,可以这样做:

val df2_as1 = df2
    .select(df1
        .columns
        .map(c => if(df2.columns.contains(c)) col(c) else lit(null).as(c))
    : _*)
// Here, union would work just as well.
val result = df1.unionByName(df2_as1)

在Java中,这显然更加令人痛苦:

List<String> df2_cols = Arrays.asList(df2.columns());
// cols is the list of columns contained in df1, but all columns
// that are not in df2 are set to null.
List<Column> cols = new ArrayList<>();
for (String c : df1.columns()) {
    if(df2_cols.contains(c))
          cols.add(functions.col(c));
    else
          cols.add(functions.lit(null).alias(c));
}
// We modify df2 so that its schema matches df1's.
Dataset<Row> df2_as1 = df2.select(JavaConverters.asScalaBuffer(cols).toSeq());
        
// Here, union would work just as well.
Dataset<Row> result = df1.unionByName(df2_as1);

那很有帮助!但我无法在Java中实现相同的功能。如果可能的话,您能否将代码语法更改为Java? - George
我用Java版本编辑了答案;-) - Oli
在这行代码中 Dataset<Row> df2_as1 = df2.select(JavaConverters.asScalaBuffer(cols).toSeq());,我使用了 JavaConvesions 而没有使用 toSeq(),但代码对我来说是正常的.. 这里有什么需要注意的吗? - George
说实话,我不是Java/Scala转换的专家。我主要使用Scala编码,但我认为只要您将列表转换并将其提供给Spark进行编译,它应该按预期工作。 - Oli

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接