我需要基于一些共享键列将许多DataFrame连接在一起。对于一个键值对RDD,可以指定一个分区器,使得具有相同键的数据点被洗牌到同一个执行器,因此如果在join
之前存在与洗牌相关的操作,则连接更加高效。是否可以在Spark DataFrames或DataSets上执行相同的操作?
我需要基于一些共享键列将许多DataFrame连接在一起。对于一个键值对RDD,可以指定一个分区器,使得具有相同键的数据点被洗牌到同一个执行器,因此如果在join
之前存在与洗牌相关的操作,则连接更加高效。是否可以在Spark DataFrames或DataSets上执行相同的操作?
如果您知道将要多次连接数据框,则可以在加载后重新划分数据框。
val users = spark.read.load("/path/to/users").repartition('userId)
val joined1 = users.join(addresses, "userId")
joined1.show() // <-- 1st shuffle for repartition
val joined2 = users.join(salary, "userId")
joined2.show() // <-- skips shuffle for users since it's already been repartitioned
它会将数据洗牌一次,然后在连续多次连接时重用这些洗牌文件。
但是,如果您知道将在某些键上重复地对数据进行洗牌,则最好的选择是将数据保存为存储桶表。这将把数据预先哈希分区并写出,因此当您读取表并将它们连接起来时,可以避免洗牌。您可以按以下方式执行:
// you need to pick a number of buckets that makes sense for your data
users.bucketBy(50, "userId").saveAsTable("users")
addresses.bucketBy(50, "userId").saveAsTable("addresses")
val users = spark.read.table("users")
val addresses = spark.read.table("addresses")
val joined = users.join(addresses, "userId")
joined.show() // <-- no shuffle since tables are co-partitioned
为了避免洗牌,表格必须使用相同的桶分配方式(例如使用相同数量的桶,同时在桶列上进行连接)。通过使用DataFrame / DataSet API中的repartition
方法,可以实现数据分区。使用此方法,您可以指定一个或多个列用于数据分区,例如:
val df2 = df.repartition($"colA", $"colB")
同时,在同一条命令中指定所需分区数量也是可能的。
val df2 = df.repartition(10, $"colA", $"colB")
注意:这并不保证数据框的分区将位于同一节点上,只是确保以相同的方式进行分区。