为Spark数据框/数据集进行有效连接的分区数据

18

我需要基于一些共享键列将许多DataFrame连接在一起。对于一个键值对RDD,可以指定一个分区器,使得具有相同键的数据点被洗牌到同一个执行器,因此如果在join之前存在与洗牌相关的操作,则连接更加高效。是否可以在Spark DataFrames或DataSets上执行相同的操作?


@Shaido,在使用coleasc()时,我们需要遵循哪些步骤才能获得最佳的Spark作业性能? - Shasu
2个回答

34

如果您知道将要多次连接数据框,则可以在加载后重新划分数据框。

val users = spark.read.load("/path/to/users").repartition('userId)

val joined1 = users.join(addresses, "userId")
joined1.show() // <-- 1st shuffle for repartition

val joined2 = users.join(salary, "userId")
joined2.show() // <-- skips shuffle for users since it's already been repartitioned

它会将数据洗牌一次,然后在连续多次连接时重用这些洗牌文件。

但是,如果您知道将在某些键上重复地对数据进行洗牌,则最好的选择是将数据保存为存储桶表。这将把数据预先哈希分区并写出,因此当您读取表并将它们连接起来时,可以避免洗牌。您可以按以下方式执行:

// you need to pick a number of buckets that makes sense for your data
users.bucketBy(50, "userId").saveAsTable("users")
addresses.bucketBy(50, "userId").saveAsTable("addresses")

val users = spark.read.table("users")
val addresses = spark.read.table("addresses")

val joined = users.join(addresses, "userId")
joined.show() // <-- no shuffle since tables are co-partitioned
为了避免洗牌,表格必须使用相同的桶分配方式(例如使用相同数量的桶,同时在桶列上进行连接)。

1
单个连接使用重新分区是否有利润? - Cherry
不会有任何好处,但也不会有任何伤害。除非是广播,否则Spark仍需要进行shuffle以进行连接。 - Silvio
2
我正在尝试理解为什么对于Spark来优化第二个作业,您需要通过userId显式地重新分区。在第一个需要洗牌的作业之后,Spark不知道数据现在已经按userid进行了分区吗? - allstar
@Silvio,谢谢你,在保存时即saveAsTable(“users”)被保存在哪里?下一次作业运行时,它会被覆盖吗?如何处理它?我的数据源是s3路径。 - Shasu

7

通过使用DataFrame / DataSet API中的repartition方法,可以实现数据分区。使用此方法,您可以指定一个或多个列用于数据分区,例如:

val df2 = df.repartition($"colA", $"colB")

同时,在同一条命令中指定所需分区数量也是可能的。

val df2 = df.repartition(10, $"colA", $"colB")

注意:这并不保证数据框的分区将位于同一节点上,只是确保以相同的方式进行分区。


谢谢,以上两个版本有什么确切的不同,在第二种情况下如何避免空分区? - Shasu
@Shyam:第一行将使用配置的默认分区数(200)。也许这可以帮助回答第二个问题:https://dev59.com/D6vka4cB1Zd3GeqP0uxJ - Shaido
非常感谢,你总是很有帮助。还有一件事,我的Spark作业在连接时出现OOM错误,如何检查每个数据集所占用的内存?我可以在SparkUI中找到吗? - Shasu
我正在根据列的分组(by)来旋转数据框...但速度非常慢...如何进行调优? - Shasu

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接