Parquet文件格式对记录的顺序非常敏感。其列式编码可能会根据排序顺序生成显著较小的文件。
另一方面,对1TB的输入记录进行排序非常昂贵。
将其拆分为大小为10GB的块可以在内存中进行排序,同时生成几乎与完全排序的1 TB一样小的parquet文件。
是否可以指示Spark SQL在生成parquet文件之前进行分块排序?
另一个用例是将许多小的Parquet文件合并成一个统一的Parquet文件,同时在写入统一的Parquet文件之前使用分块排序。
Parquet文件格式对记录的顺序非常敏感。其列式编码可能会根据排序顺序生成显著较小的文件。
另一方面,对1TB的输入记录进行排序非常昂贵。
将其拆分为大小为10GB的块可以在内存中进行排序,同时生成几乎与完全排序的1 TB一样小的parquet文件。
是否可以指示Spark SQL在生成parquet文件之前进行分块排序?
另一个用例是将许多小的Parquet文件合并成一个统一的Parquet文件,同时在写入统一的Parquet文件之前使用分块排序。
coalesce
与Hive SORT BY
子句组合使用,这应该具有类似的效果。val df: DataFrame = ???
val n: Int = ??? //
df.coalesce(n)
df.coalesce(n).registerTempTable("df")
sqlContext.sql("SELECT * FROM df SORT BY foo, bar").write.parquet(...)
or
df.coalesce(n).sortWithinPartitions($"foo", $"bar").write.parquet(...)
SORT BY
与DataFrame.sort
不等同。
Spark 2.0.0引入了sortBy
和bucketBy
方法,其中后者通过给定的列对每个桶中的输出进行排序,并且应支持Parquet:
val df: DataFrame = ???
val nBuckets: Int = ???
df.write.bucketBy(nBuckets, "foo").sortBy("foo", "bar").saveAsTable(...)
saveAsTable
保存Parquet文件时有效,但它似乎不直接支持parquet writer (df.write.bucketBy(...).sortBy(...).parquet(...)
) 在 spark-2.0.0-preview
中。
sortWithinPartitions
避免了最终合并以及执行完整排序所需的洗牌。据我所知,Spark 通常不使用内存排序,因为它不假设单个分区的数据适合于内存。但我不确定是否有任何特定于 SQL 的优化正在发挥作用。您可以使用rdd.mapPartitions
将 Iterator 转换为本地结构并直接进行排序。关于sortBy
,我知道的唯一参考是 相应的 JIRA 和源代码 / 测试。 - zero323spark-2.2.0
不支持df.write.bucketBy(...).sortBy(...).parquet(...)
,但会抛出适当的异常。SPARK-15718 SQL better error message for writing bucketed data - ruseel