无法在Spark中将有序数据写入Parquet

8
我正在使用Apache Spark生成parquet文件。我可以按日期对它们进行分区,但在内部似乎无法以正确的顺序布置数据。
处理过程中似乎会丢失顺序,这意味着parquet元数据不正确(具体来说,我希望确保parquet行组反映排序顺序,以便特定于我的用例的查询可以通过元数据有效地进行过滤)。
请考虑以下示例:
// note: hbase source is a registered temp table generated from hbase
val transformed = sqlContext.sql(s"SELECT  id, sampleTime, ... , toDate(sampleTime) as date FROM hbaseSource")

// Repartion the input set by the date column (in my source there should be 2 distinct dates)
val sorted = transformed.repartition($"date").sortWithinPartitions("id", "sampleTime")

sorted.coalesce(1).write.partitionBy("date").parquet(s"/outputFiles")

采用这种方法,我可以得到正确的parquet分区结构(按日期)。更好的是,对于每个日期分区,我可以看到一个单独的大型parquet文件。

 /outputFiles/date=2018-01-01/part-00000-4f14286c-6e2c-464a-bd96-612178868263.snappy.parquet

然而,当我查询文件时,发现内容是无序的。具体来说,“无序”更像是将几个有序的数据帧分区合并到了文件中。Parquet行组元数据显示排序字段实际上是重叠的(例如,特定ID可能位于许多行组中):
id:             :[min: 54, max: 65012, num_nulls: 0]
sampleTime:     :[min: 1514764810000000, max: 1514851190000000, num_nulls: 0]
id:             :[min: 827, max: 65470, num_nulls: 0]
sampleTime:     :[min: 1514764810000000, max: 1514851190000000, num_nulls: 0]
id:             :[min: 1629, max: 61412, num_nulls: 0]

我希望每个文件中的数据都能得到适当的排序,以便每个行组内的元数据最小/最大值不会重叠。

例如,这就是我想看到的模式:

RG 0: id:             :[min: 54, max: 100, num_nulls: 0]
RG 1: id:             :[min: 100, max: 200, num_nulls: 0]

...其中RG表示"行组"。如果我想要id=75,查询可以在一个行组中找到它。

我尝试了许多以上代码的变化。例如,使用或不使用coalesce(我知道coalesce很糟糕,但我的想法是使用它来防止洗牌)。我还尝试过sort而不是sortWithinPartitions(sort应该创建一个完全有序的排序,但会导致许多分区)。例如:

val sorted = transformed.repartition($"date").sort("id", "sampleTime") 
sorted.write.partitionBy("date").parquet(s"/outputFiles")

给了我200个文件,这太多了,而且它们还没有正确排序。我可以通过调整洗牌大小来减少文件数量,但我本来希望在写入过程中按顺序处理排序(我认为写入不会对输入进行洗牌)。我看到的顺序如下(为简洁起见省略了其他字段):

+----------+----------------+
|id|      sampleTime|
+----------+----------------+
|     56868|1514840220000000|
|     57834|1514785180000000|
|     56868|1514840220000000|
|     57834|1514785180000000|
|     56868|1514840220000000|

这个看起来像是交错排序分区。所以我认为 repartition 在这里并没有什么作用,而且 sort 似乎无法在写入时保持顺序。

我已经读过可以实现我想要的功能。我甚至尝试过 Ryan Blue 在演示文稿《Parquet 性能调优:缺失的指南》中提出的方法(不幸的是它在 OReily 支付墙后面)。那种情况下,Spark 似乎使用了一个旧版本的 Parquet-mr,导致元数据损坏,而我不确定如何升级它。

我不确定我做错了什么。我的感觉是我误解了 repartition($"date")sort 的工作方式和/或交互方式。

我会感激任何想法。对于这篇长文,我表示歉意。 :)

编辑: 还要注意,如果我在 transformed.sort("id", "sampleTime") 上执行 show(n),数据将被正确排序。所以问题似乎出现在写入阶段。正如上面所提到的,看起来排序的输出在写入时被洗牌了。

2个回答

10
问题在于,当保存文件格式时,Spark需要一些顺序。如果无法满足该顺序,Spark将在保存过程中根据要求对数据进行排序,并忘记您的排序。更具体地说,Spark需要按此顺序进行操作(这直接摘自Spark 2.4.4的源代码):
val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns

在这里,partitionColumns是您用于对数据进行分区的列。由于您没有使用桶排序,因此在本示例中,bucketingIdExpressionsortColumns不相关,并且requiredOrdering将仅为partitionColumns。因此,如果这是您的代码:

val sorted = transformed.repartition($"date").sortWithinPartitions("id", 
"sampleTime")

sorted.write.partitionBy("date").parquet(s"/outputFiles")

Spark会检查数据是否按日期排序,但实际上并没有排序,因此Spark将忽略您的排序并按日期排序。另一方面,如果您像这样操作:

val sorted = transformed.repartition($"date").sortWithinPartitions("date", "id", 
"sampleTime")

sorted.write.partitionBy("date").parquet(s"/outputFiles")

Spark会再次检查数据是否按日期排序,这一次已经排好序了(满足要求),所以Spark将保留此顺序,并在保存数据时不进行更多的排序。因此,我相信这种方法应该可以正常工作。


注意:在这个例子中,“sortColumns”是相关的;问题在于,除非“partitionColumns”也经过“sortWithinPartitions”的处理,否则Spark会忽略它们。 - ijoseph

0

只是一个想法,先使用coalesce排序: “.coalesce(1).sortWithinPartitions()”。另外,预期结果看起来很奇怪——为什么需要有序的parquet数据?在读取后进行排序似乎更合适。


1
感谢您的回复。正如所述,将数据在保存到parquet之前进行排序,意味着parquet元数据将对我的用例进行优化。这是一种众所周知的parquet查询优化方式,适用于具有特定过滤器的查询,因为读取器可以跳过整个parquet行组(而不必检查它们)。我尝试了许多coalescesortWithinPartitions的变体(包括您的建议)。根据我的最终记录,直接排序操作的结果是有序的。排序似乎会在最终写入时发生改变。 - ZenMasterZed

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接