I am trying to create an aggregate file for end users to utilize to avoid having them process multiple sources with much larger files. To do that I: A) iterate through all source folders, stripping out 12 fields that are most commonly requested, spinning out parquet files in a new location where these results are co-located. B) I try to go back through the files created in step A and re-aggregate them by grouping by the 12 fields to reduce it to a summary row for each unique combination.
What I'm finding is that step A reduces the payload 5:1 (roughly 250 gigs becomes 48.5 gigs). Step B however, instead of further reducing this, increase by 50% over step A. However, my counts match.
This is using Spark 1.5.2
My code, modified only to replace the field names with field1...field12 to make it more readable, is below with the results I've noted.
While I don't necessarily expect another 5:1 reduction, I don't know what I'm doing incorrectly to increase the storage side for less rows with the same schema. Anyone able to help me understand what I did wrong?
Thanks!
//for each eventName found in separate source folders, do the following:
//spit out one row with key fields from the original dataset for quicker availability to clients
//results in a 5:1 reduction in size
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table"
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/")
//results in over 700 files with a total of 16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed
//after all events are processed, aggregate the results
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12"
//Use a wildcard to search all sub-folders created above
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results")
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/")
//This results in 3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed
//The parquet schemas created (both tables match):
|-- field1: string (nullable = true) (10 characters)
|-- field2: string (nullable = true) (15 characters)
|-- field3: string (nullable = true) (50 characters max)
|-- field4: string (nullable = true) (10 characters)
|-- field5: string (nullable = true) (10 characters)
|-- field6: string (nullable = true) (10 characters)
|-- field7: string (nullable = true) (16 characters)
|-- field8: string (nullable = true) (10 characters)
|-- field9 string (nullable = true) (15 characters)
|-- field10: string (nullable = true)(20 characters)
|-- field11: string (nullable = true)(14 characters)
|-- field12: string (nullable = true)(14 characters)
|-- rCount: long (nullable = true)
|-- dt: string (nullable = true)
In general columnar storage formats like Parquet are highly sensitive when it comes to data distribution (data organization) and cardinality of individual columns. The more organized is data and the lower is cardinality the more efficient is the storage.
Aggregation, as the one you apply, has to shuffle the data. When you check the execution plan you'll see it is using hash partitioner. It means that after aggregation distribution can be less efficient than the one for the original data. At the same time
sum
can reduce number of rows but increase cardinality forrCount
column.You can try different tools to correct for that but not all are available in Spark 1.5.2:
sortWithinPartitions
.partitionBy
method ofDataFrameWriter
to partition data using low cardinality columns.bucketBy
andsortBy
methods ofDataFrameWriter
(Spark 2.0.0+) to improve data distribution using bucketing and local sorting.