为什么Spark Parquet文件中的聚合结果比原始数据还要大?

8
我正在尝试创建一个聚合文件,供最终用户使用,以避免他们处理更大的多个源文件。为此,我: A) 遍历所有源文件夹,剥离出最常被请求的12个字段,在新位置旋转parquet文件,其中这些结果位于同一位置。 B) 我试图回到步骤A中创建的文件,通过按12个字段分组来重新聚合它们,将其缩减为每个唯一组合的摘要行。
我发现,步骤A将有效负载减少了5:1(大约250 GB变为48.5 GB)。但是,步骤B并没有进一步减少,而是比步骤A增加了50%。但是,我的计数匹配。
这是使用Spark 1.5.2进行的 下面是我的代码,仅修改了字段名称以使其更易读,并附有我注意到的结果。
虽然我不一定希望再次减少5:1,但我不知道我做错了什么,以便在具有相同架构的少量行的情况下增加存储空间。是否有人能帮助我理解我做错了什么?
谢谢!
//for each eventName found in separate source folders, do the following:
//spit out one row with key fields from the original dataset for quicker availability to clients 
//results in a 5:1 reduction in size
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table"
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/")
//results in over 700 files with a total of  16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed 

//after all events are processed, aggregate the results
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12"
//Use a wildcard to search all sub-folders created above
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results")
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/")
//This results in  3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed

//The parquet schemas created (both tables match):
 |-- field1: string (nullable = true) (10 characters)
 |-- field2: string (nullable = true) (15 characters)
 |-- field3: string (nullable = true) (50 characters max)
 |-- field4: string (nullable = true) (10 characters)
 |-- field5: string (nullable = true) (10 characters)
 |-- field6: string (nullable = true) (10 characters)
 |-- field7: string (nullable = true) (16 characters)
 |-- field8: string (nullable = true) (10 characters)
 |-- field9 string (nullable = true)  (15 characters)
 |-- field10: string (nullable = true)(20 characters)
 |-- field11: string (nullable = true)(14 characters)
 |-- field12: string (nullable = true)(14 characters)
 |-- rCount: long (nullable = true)   
 |-- dt: string (nullable = true)
1个回答

10

一般来说,像Parquet这样的列式存储格式在数据分布(数据组织)和单个列的基数方面非常敏感。数据组织得越好,基数越低,存储效率就越高。

聚合操作必须对数据进行洗牌。当您查看执行计划时,您会看到它使用哈希分区器。这意味着在聚合之后,数据分配可能比原始数据更不有效率。同时,sum可以减少行数,但增加rCount列的基数。

您可以尝试不同的工具来纠正这个问题,但并非所有工具都适用于Spark 1.5.2:

  • 按照基数较低的列对整个数据集进行排序(由于需要完全洗牌而代价昂贵),或者使用sortWithinPartitions
  • 使用DataFrameWriterpartitionBy方法,使用基数较低的列对数据进行分区。
  • 使用DataFrameWriterbucketBysortBy方法(Spark 2.0.0+)使用桶和本地排序来改善数据分布。

1
在Spark 2.0.0中,似乎无法将bucketBy与DataFrameWriter一起使用。 - eliasah
2
但是,按低基数排序为什么有助于压缩?如果我理解正确,排序有助于运行长度编码,而相对较低的基数列使用字典编码进行压缩-那么为什么排序很重要?除非...将这些值分组在一起有助于推断出基数较低,并且如果它们分布,则可能不会发生这种推断(这是一个问题,不是陈述)。 - Vitaliy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接