I am running the below code in spark to compare the data stored in a csv file and a hive table. My data file is about 1.5GB and about 0.2 billion rows. When I run the code below, I am getting GC overhead limit exceeded error
. I am not sure why I am getting this error. I have search various articles.
The error comes at Test 3 step sourceDataFrame.except(targetRawData).count > 0
I am not sure if there is any memory leak or not. How can I debug and resolve the same?
import org.apache.spark.sql.hive._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{to_date, to_timestamp}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.text._
import java.util.Date
import scala.util._
import org.apache.spark.sql.hive.HiveContext
//val conf = new SparkConf().setAppName("Simple Application")
//val sc = new SparkContext(conf)
val hc = new HiveContext(sc)
val spark: SparkSession = SparkSession.builder().appName("Simple Application").config("spark.master", "local").getOrCreate()
// set source and target location
//val sourceDataLocation = "hdfs://localhost:9000/sourcec.txt"
val sourceDataLocation = "s3a://rbspoc-sas/sas_valid_large.txt"
val targetTableName = "temp_TableA"
// Extract source data
println("Extracting SAS source data from csv file location " + sourceDataLocation);
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val sourceRawCsvData = sc.textFile(sourceDataLocation)
println("Extracting target data from hive table " + targetTableName)
val targetRawData = hc.sql("Select datetime,load_datetime,trim(source_bank) as source_bank,trim(emp_name) as emp_name,header_row_count, emp_hours from " + targetTableName)
// Add the test cases here
// Test 1 - Validate the Structure
println("Validating the table structure...")
var startTime = getTimestamp()
val headerColumns = sourceRawCsvData.first().split(",").to[List]
val schema = TableASchema(headerColumns)
val sourceData = sourceRawCsvData.mapPartitionsWithIndex((index, element) => if (index == 0) element.drop(1) else element)
.map(_.split(",").toList)
.map(row)
val sourceDataFrame = spark.createDataFrame(sourceData,schema)
//val sourceDataFrame = sourceDataFrame.toDF(sourceDataFrame.columns map(_.toLowerCase): _*)
val sourceSchemaList = flatten(sourceDataFrame.schema).map(r => r.dataType.toString).toList
val targetSchemaList = flatten(targetRawData.schema).map(r => r.dataType.toString).toList
var endTime = getTimestamp()
if (sourceSchemaList.diff(targetSchemaList).length > 0) {
println("Updating StructureValidation result in table...")
UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed StructureValidation. ")
// Force exit here if needed
// sys.exit(1)
} else {
println("Updating StructureValidation result in table...")
UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed StructureValidation. ")
}
// Test 2 - Validate the Row count
println("Validating the Row count...")
startTime = getTimestamp()
// check the row count.
val sourceCount = sourceData.count()
val targetCount = targetRawData.count()
endTime = getTimestamp()
if (sourceCount != targetCount){
println("Updating RowCountValidation result in table...")
// Update the result in the table
UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed RowCountValidation. Source count:$sourceCount and Target count:$targetCount")
// Force exit here if needed
//sys.exit(1)
}
else{
println("Updating RowCountValidation result in table...")
// Update the result in the table
UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed RowCountValidation. Source count:$sourceCount and Target count:$targetCount")
}
// Test 3 - Validate the data
println("Comparing source and target data...")
startTime = getTimestamp()
if (sourceDataFrame.except(targetRawData).count > 0 ){
endTime = getTimestamp()
// Update the result in the table
println("Updating DataValidation result in table...")
UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed DataMatch validation")
// Force exit here if needed
// sys.exit(1)
}
else{
endTime = getTimestamp()
println("Updating DataValidation result in table...")
// Update the result in the table
UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed DataMatch validation")
}
// Test 4 - Calculate the average and variance of Int or Dec columns
// Test 5 - String length validation
def UpdateResult(tableName: String, startTime: String, endTime: String, returnCode: Int, description: String){
val insertString = s"INSERT INTO TABLE TestResult VALUES( FROM_UNIXTIME(UNIX_TIMESTAMP()),'$startTime','$endTime','$tableName',$returnCode,'$description')"
val a = hc.sql(insertString)
}
def TableASchema(columnName: List[String]): StructType = {
StructType(
Seq(
StructField(name = "datetime", dataType = TimestampType, nullable = true),
StructField(name = "load_datetime", dataType = TimestampType, nullable = true),
StructField(name = "source_bank", dataType = StringType, nullable = true),
StructField(name = "emp_name", dataType = StringType, nullable = true),
StructField(name = "header_row_count", dataType = IntegerType, nullable = true),
StructField(name = "emp_hours", dataType = DoubleType, nullable = true)
)
)
}
def row(line: List[String]): Row = {
Row(convertToTimestamp(line(0).trim), convertToDate(line(1).trim), line(2).trim, line(3).trim, line(4).toInt, line(5).toDouble)
}
def convertToTimestamp(s: String) : Timestamp = s match {
case "" => null
case _ => {
val format = new SimpleDateFormat("ddMMMyyyy:HH:mm:ss")
Try(new Timestamp(format.parse(s).getTime)) match {
case Success(t) => t
case Failure(_) => null
}
}
}
def convertToDate(s: String) : Timestamp = s match {
case "" => null
case _ => {
val format = new SimpleDateFormat("ddMMMyyyy")
Try(new Timestamp(format.parse(s).getTime)) match {
case Success(t) => t
case Failure(_) => null
}
}
}
def flatten(scheme: StructType): Array[StructField] = scheme.fields.flatMap { f =>
f.dataType match {
case struct:StructType => flatten(struct)
case _ => Array(f)
}
}
def getTimestamp(): String = {
val now = java.util.Calendar.getInstance()
val timestampFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
timestampFormat.format(now.getTime())
}
Exception is below:
17/12/21 05:18:40 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(8,0,ShuffleMapTask,TaskKilled(stage cancelled),org.apache.spark.scheduler.TaskInfo@78db3052,null)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in stage 8.0 failed 1 times, most recent failure: Lost task 17.0 in stage 8.0 (TID 323, localhost, executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2430)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2429)
at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2429)
... 53 elided
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
scala> 17/12/21 05:18:40 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: /tmp/spark-6f345216-41df-4fd6-8e3d-e34d49e28f0c
java.io.IOException: Failed to delete: /tmp/spark-6f345216-41df-4fd6-8e3d-e34d49e28f0c
at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1031)
at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:65)
at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:62)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.util.ShutdownHookManager$$anonfun$1.apply$mcV$sp(ShutdownHookManager.scala:62)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)