I have some huge files (of 19GB, 40GB etc.). I need to execute following algorithm on these files:
- Read the file
- Sort it on the basis of 1 column
Take 1st 70% of the data:
a) Take all the distinct records of the subset of the columns
b) write it to train fileTake the last 30% of the data:
a) Take all the distinct records of the subset of the columns
b) write it to test file
I tried running following code in spark (using Scala).
import scala.collection.mutable.ListBuffer
import java.io.FileWriter
import org.apache.spark.sql.functions.year
val offers = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true") // Automatically infer data types
.option("delimiter", ",")
.load("input.txt")
val csvOuterJoin = offers.orderBy("utcDate")
val trainDF = csvOuterJoin.limit((csvOuterJoin.count*.7).toInt)
val maxTimeTrain = trainDF.agg(max("utcDate"))
val maxtimeStamp = maxTimeTrain.collect()(0).getTimestamp(0)
val testDF = csvOuterJoin.filter(csvOuterJoin("utcDate") > maxtimeStamp)
val inputTrain = trainDF.select("offerIdClicks","userIdClicks","userIdOffers","offerIdOffers").distinct
val inputTest = testDF.select("offerIdClicks","userIdClicks","userIdOffers","offerIdOffers").distinct
inputTrain.rdd.coalesce(1,false).saveAsTextFile("train.csv")
inputTest.rdd.coalesce(1,false).saveAsTextFile("test.csv")
This is how I initiate spark-shell:
./bin/spark-shell --packages com.databricks:spark-csv_2.11:1.4.0 --total-executor-cores 70 --executor-memory 10G --driver-memory 20G
I execute this code on a distributed cluster with 1 master and many slaves each having sufficient amount of RAM. As of now, this code ends up taking a lot of memory and I get java heap space issues.
Is there a way to optimize the above code (preferably in spark)? I appreciate any kind of minimal help in optimizing the above code.