我正在尝试创建一个聚合文件,供最终用户使用,以避免他们处理更大的多个源文件。为此,我:
A) 遍历所有源文件夹,剥离出最常被请求的12个字段,在新位置旋转parquet文件,其中这些结果位于同一位置。
B) 我试图回到步骤A中创建的文件,通过按12个字段分组来重新聚合它们,将其缩减为每个唯一组合的摘要行。
我发现,步骤A将有效负载减少了5:1(大约250 GB变为48.5 GB)。但是,步骤B并没有进一步减少,而是比步骤A增加了50%。但是,我的计数匹配。
这是使用Spark 1.5.2进行的 下面是我的代码,仅修改了字段名称以使其更易读,并附有我注意到的结果。
虽然我不一定希望再次减少5:1,但我不知道我做错了什么,以便在具有相同架构的少量行的情况下增加存储空间。是否有人能帮助我理解我做错了什么?
谢谢!
我发现,步骤A将有效负载减少了5:1(大约250 GB变为48.5 GB)。但是,步骤B并没有进一步减少,而是比步骤A增加了50%。但是,我的计数匹配。
这是使用Spark 1.5.2进行的 下面是我的代码,仅修改了字段名称以使其更易读,并附有我注意到的结果。
虽然我不一定希望再次减少5:1,但我不知道我做错了什么,以便在具有相同架构的少量行的情况下增加存储空间。是否有人能帮助我理解我做错了什么?
谢谢!
//for each eventName found in separate source folders, do the following:
//spit out one row with key fields from the original dataset for quicker availability to clients
//results in a 5:1 reduction in size
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table"
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/")
//results in over 700 files with a total of 16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed
//after all events are processed, aggregate the results
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12"
//Use a wildcard to search all sub-folders created above
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results")
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/")
//This results in 3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed
//The parquet schemas created (both tables match):
|-- field1: string (nullable = true) (10 characters)
|-- field2: string (nullable = true) (15 characters)
|-- field3: string (nullable = true) (50 characters max)
|-- field4: string (nullable = true) (10 characters)
|-- field5: string (nullable = true) (10 characters)
|-- field6: string (nullable = true) (10 characters)
|-- field7: string (nullable = true) (16 characters)
|-- field8: string (nullable = true) (10 characters)
|-- field9 string (nullable = true) (15 characters)
|-- field10: string (nullable = true)(20 characters)
|-- field11: string (nullable = true)(14 characters)
|-- field12: string (nullable = true)(14 characters)
|-- rCount: long (nullable = true)
|-- dt: string (nullable = true)