如何在Spark SQL中控制分区大小

27

我需要使用Spark SQL的 HiveContext 从Hive表中加载数据并将其加载到HDFS中。默认情况下,SQL输出的 DataFrame 有2个分区。为了获得更多的并行性,我需要从SQL中获得更多的分区。但是在 HiveContext 中没有重载的方法可以接受分区数参数。

重新分区RDD会导致数据混洗,并增加处理时间。

>

val result = sqlContext.sql("select * from bt_st_ent")

已输出以下日志内容:

Starting task 0.0 in stage 131.0 (TID 297, aster1.com, partition 0,NODE_LOCAL, 2203 bytes)
Starting task 1.0 in stage 131.0 (TID 298, aster1.com, partition 1,NODE_LOCAL, 2204 bytes)

我想知道是否有办法增加SQL输出的分区大小。

3个回答

21

Spark < 2.0:

您可以使用Hadoop配置选项:

  • mapred.min.split.size.
  • mapred.max.split.size

同时,HDFS块大小可以用于控制基于文件系统的格式*的分区大小。

val minSplit: Int = ???
val maxSplit: Int = ???

sc.hadoopConfiguration.setInt("mapred.min.split.size", minSplit)
sc.hadoopConfiguration.setInt("mapred.max.split.size", maxSplit)

Spark 2.0+:

您可以使用spark.sql.files.maxPartitionBytes配置:

spark.conf.set("spark.sql.files.maxPartitionBytes", maxSplit)

在这两种情况下,这些值可能不会被特定数据源API使用,因此您应始终检查所使用格式的文档/实现详细信息。

* 其他输入格式可以使用不同的设置。例如,请参见

此外,从RDDs创建的Datasets将继承其父级的分区布局。

类似地,桶式表将使用元数据存储中定义的桶布局,并与Dataset分区具有1:1的关系。


5

这是一个非常普遍且令人痛苦的问题。您应该寻找一个可以将数据均匀分区的关键字。然后,您可以使用DISTRIBUTE BYCLUSTER BY操作符告诉Spark在分区中对行进行分组。这将导致查询本身产生一些开销,但会得到大小均匀的分区。 Deepsense有一个非常好的教程。


2
对于从谷歌搜索到这里的人:Deepsense似乎已更改其顶级域,因此上面的链接已过时。 帖子现在在https://deepsense.ai/optimize-spark-with-distribute-by-and-cluster-by/。 - Julian Neuberger
我在阅读时尝试在冰山表上实现分布/集群,但数据框上创建的分区数量和大小都是相同的。我正在使用主键来分布数据。 - undefined

3

如果您的SQL执行洗牌操作(例如它具有连接或某种分组),则可以通过设置“spark.sql.shuffle.partitions”属性来设置分区数。

 sqlContext.setConf( "spark.sql.shuffle.partitions", 64)

跟随Fokko的建议,你可以使用随机变量进行聚类。
val result = sqlContext.sql("""
   select * from (
     select *,random(64) as rand_part from bt_st_ent
   ) cluster by rand_part""")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接