I have a need of joining tables using Spark SQL or Dataframe API. Need to know what would be optimized way of achieving it.
Scenario is:
- All data is present in Hive in ORC format (Base Dataframe and Reference files).
- I need to join one Base file (Dataframe) read from Hive with 11-13 other reference file to create a big in-memory structure (400 columns) (around 1 TB in size)
What can be best approach to achieve this? Please share your experience if some one has encounter similar problem.
My default advice on how to optimize joins is:
Use a broadcast join if you can (see this notebook). From your question it seems your tables are large and a broadcast join is not an option.
Consider using a very large cluster (it's cheaper that you may think). $250 right now (6/2016) buys about 24 hours of 800 cores with 6Tb RAM and many SSDs on the EC2 spot instance market. When thinking about total cost of a big data solution, I find that humans tend to substantially undervalue their time.
Use the same partitioner. See this question for information on co-grouped joins.
If the data is huge and/or your clusters cannot grow such that even (3) above leads to OOM, use a two-pass approach. First, re-partition the data and persist using partitioned tables (dataframe.write.partitionBy()
). Then, join sub-partitions serially in a loop, "appending" to the same final result table.
Side note: I say "appending" above because in production I never use SaveMode.Append
. It is not idempotent and that's a dangerous thing. I use SaveMode.Overwrite
deep into the subtree of a partitioned table tree structure. Prior to 2.0.0 and 1.6.2 you'll have to delete _SUCCESS
or metadata files or dynamic partition discovery will choke.
Hope this helps.
Partition the source use hash partitions or range partitions or you can write custom partitions if you know better about the joining fields. Partition will help to avoid repartition during joins as spark data from same partition across tables will exist in same location.
ORC will definitely help the cause.
IF this is still causing spill, try using tachyon which will be faster than disk