基于列值对Spark DataFrame进行分区?

12

我有一个来自SQL数据源的数据框,它看起来像这样:

User(id: Long, fname: String, lname: String, country: String)

[1, Fname1, Lname1, Belarus]
[2, Fname2, Lname2, Belgium]
[3, Fname3, Lname3, Austria]
[4, Fname4, Lname4, Australia]

我想要将这些数据按照国家名称首字母分区,并写入csv文件中,因此输出文件中的Belarus和Belgium应该为一组,Austria和Australia则在另一组。
2个回答

12

以下是您可以做的事情

import org.apache.spark.sql.functions._
//create a dataframe with demo data
val df = spark.sparkContext.parallelize(Seq(
  (1, "Fname1", "Lname1", "Belarus"),
  (2, "Fname2", "Lname2", "Belgium"),
  (3, "Fname3", "Lname3", "Austria"),
  (4, "Fname4", "Lname4", "Australia")
)).toDF("id", "fname","lname", "country")

//create a new column with the first letter of column
val result = df.withColumn("countryFirst", split($"country", "")(0))

//save the data with partitionby first letter of country 

result.write.partitionBy("countryFirst").format("com.databricks.spark.csv").save("outputpath")

修改: 您还可以使用Raphel建议的子字符串来提高性能,如下所示

substring(Column str, int pos, int len) 子字符串从pos开始,长度为len, 当str类型为String时返回字符串类型的子串,当str类型为Binary时返回以byte类型表示的子数组,其起始位置为pos,长度为len。

val result = df.withColumn("firstCountry", substring($"country",1,1))

然后使用 partitionby 写入

希望这能解决你的问题!


除了这个问题,使用df.withColumn是否会有性能损失,或者是否可以以更有效的方式完成? - jdk2588
1
你也可以使用Spark的substring函数代替split,我认为这更易读。 - Raphael Roth
1
我们可以使用多列来完成这个任务吗? - user482963
2
这会不会在输出数据中添加一个名为“countryFirst”的额外列?有没有办法在输出数据中没有该列,但仍按“countryFirst”列分区数据?一种简单的方法是迭代“countryFirst”的不同值,并针对每个不同的“countryFirst”值编写过滤数据。这样,您可以避免在输出中写入额外的列。我希望能做得更好。 - Omkar Neogi

1
为解决这个问题的一个替代方案是首先创建一个只包含每个国家第一个字母的列。完成此步骤后,您可以使用partitionBy将每个分区保存到单独的文件中。
dataFrame.write.partitionBy("column").format("com.databricks.spark.csv").save("/path/to/dir/")

这将根据列值创建分区,因此我们将拥有白俄罗斯和比利时的单独文件,而不是一个文件。 - jdk2588
1
是的,正如我所提到的,您需要首先创建一个包含国家首字母的单独列。然后在该列上使用 partitionBy - Shaido

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接