如何在Spark SQL中强制使用内存分块排序?

3

Parquet文件格式对记录的顺序非常敏感。其列式编码可能会根据排序顺序生成显著较小的文件。

另一方面,对1TB的输入记录进行排序非常昂贵。

将其拆分为大小为10GB的块可以在内存中进行排序,同时生成几乎与完全排序的1 TB一样小的parquet文件。

是否可以指示Spark SQL在生成parquet文件之前进行分块排序?

另一个用例是将许多小的Parquet文件合并成一个统一的Parquet文件,同时在写入统一的Parquet文件之前使用分块排序。

1个回答

1
据我所知,在Spark < 2.0.0中,没有这样的选项可供选择。您可以尝试在写入之前将coalesce与Hive SORT BY子句组合使用,这应该具有类似的效果。
val df: DataFrame = ???
val n: Int = ??? //

df.coalesce(n)
df.coalesce(n).registerTempTable("df")
sqlContext.sql("SELECT * FROM df SORT BY foo, bar").write.parquet(...)

or

df.coalesce(n).sortWithinPartitions($"foo", $"bar").write.parquet(...)

请记住,SORT BYDataFrame.sort不等同。

Spark 2.0.0引入了sortBybucketBy方法,其中后者通过给定的列对每个桶中的输出进行排序,并且应支持Parquet:

val df: DataFrame = ???
val nBuckets: Int = ???

df.write.bucketBy(nBuckets, "foo").sortBy("foo", "bar").saveAsTable(...)

注意:这似乎只在使用saveAsTable保存Parquet文件时有效,但它似乎不直接支持parquet writer (df.write.bucketBy(...).sortBy(...).parquet(...)) 在 spark-2.0.0-preview中。

感谢zero323,就我所理解的,sortWithinPartitions 只是避免了最后的合并排序。然而,每个分区都将完全对其数据进行排序,如果无法适应内存,则将使用文件。我在哪里可以获取有关bucketBy和版本2.0.0的更多信息? - Ehud Eshet
sortWithinPartitions 避免了最终合并以及执行完整排序所需的洗牌。据我所知,Spark 通常不使用内存排序,因为它不假设单个分区的数据适合于内存。但我不确定是否有任何特定于 SQL 的优化正在发挥作用。您可以使用 rdd.mapPartitions 将 Iterator 转换为本地结构并直接进行排序。关于 sortBy,我知道的唯一参考是 相应的 JIRA 和源代码 / 测试。 - zero323
spark-2.2.0 不支持 df.write.bucketBy(...).sortBy(...).parquet(...),但会抛出适当的异常。SPARK-15718 SQL better error message for writing bucketed data - ruseel

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接