Spark GC Overhead limit exceeded error message

2019-07-09 01:55发布

问题:

I am running the below code in spark to compare the data stored in a csv file and a hive table. My data file is about 1.5GB and about 0.2 billion rows. When I run the code below, I am getting GC overhead limit exceeded error. I am not sure why I am getting this error. I have search various articles.

The error comes at Test 3 step sourceDataFrame.except(targetRawData).count > 0 I am not sure if there is any memory leak or not. How can I debug and resolve the same?

import org.apache.spark.sql.hive._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions.{to_date, to_timestamp}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.text._
import java.util.Date
import scala.util._
import org.apache.spark.sql.hive.HiveContext

  //val conf = new SparkConf().setAppName("Simple Application")
  //val sc = new SparkContext(conf)
  val hc = new HiveContext(sc)
  val spark: SparkSession = SparkSession.builder().appName("Simple Application").config("spark.master", "local").getOrCreate()

   // set source and target location
    //val sourceDataLocation = "hdfs://localhost:9000/sourcec.txt"
    val sourceDataLocation = "s3a://rbspoc-sas/sas_valid_large.txt"
    val targetTableName = "temp_TableA"

    // Extract source data
    println("Extracting SAS source data from csv file location " + sourceDataLocation);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
      val sourceRawCsvData = sc.textFile(sourceDataLocation)

    println("Extracting target data from hive table " + targetTableName)
    val targetRawData = hc.sql("Select datetime,load_datetime,trim(source_bank) as source_bank,trim(emp_name) as emp_name,header_row_count, emp_hours from " + targetTableName)


    // Add the test cases here

    // Test 1 - Validate the Structure
       println("Validating the table structure...")
       var startTime = getTimestamp()
       val headerColumns = sourceRawCsvData.first().split(",").to[List]
       val schema = TableASchema(headerColumns)

       val sourceData = sourceRawCsvData.mapPartitionsWithIndex((index, element) => if (index == 0) element.drop(1) else element)
       .map(_.split(",").toList)
       .map(row)

       val sourceDataFrame = spark.createDataFrame(sourceData,schema)
       //val sourceDataFrame = sourceDataFrame.toDF(sourceDataFrame.columns map(_.toLowerCase): _*)

       val sourceSchemaList = flatten(sourceDataFrame.schema).map(r => r.dataType.toString).toList
       val targetSchemaList = flatten(targetRawData.schema).map(r => r.dataType.toString).toList
       var endTime = getTimestamp()
       if (sourceSchemaList.diff(targetSchemaList).length > 0) {
           println("Updating StructureValidation result in table...")
           UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed StructureValidation. ")
           // Force exit here if needed
          // sys.exit(1)
       } else {
           println("Updating StructureValidation result in table...")
           UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed StructureValidation. ")
       }

    // Test 2 - Validate the Row count
       println("Validating the Row count...")
       startTime = getTimestamp()
       // check the row count.
       val sourceCount = sourceData.count()
       val targetCount = targetRawData.count()
       endTime = getTimestamp()
       if (sourceCount != targetCount){
           println("Updating RowCountValidation result in table...")
           // Update the result in the table
           UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed RowCountValidation. Source count:$sourceCount and Target count:$targetCount")
           // Force exit here if needed
           //sys.exit(1)
         }
       else{
           println("Updating RowCountValidation result in table...")
           // Update the result in the table
           UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed RowCountValidation. Source count:$sourceCount and Target count:$targetCount")
         }


    // Test 3 - Validate the data
    println("Comparing source and target data...")
    startTime = getTimestamp()
    if (sourceDataFrame.except(targetRawData).count > 0 ){
        endTime = getTimestamp()
        // Update the result in the table
        println("Updating DataValidation result in table...")
           UpdateResult(targetTableName, startTime, endTime, 1, s"FAILED: $targetTableName failed DataMatch validation")
           // Force exit here if needed
          // sys.exit(1)
         }
       else{
           endTime = getTimestamp()
           println("Updating DataValidation result in table...")
           // Update the result in the table
           UpdateResult(targetTableName, startTime, endTime, 0, s"SUCCESS: $targetTableName passed DataMatch validation")
         }

    // Test 4 - Calculate the average and variance of Int or Dec columns
    // Test 5 - String length validation

  def UpdateResult(tableName: String, startTime: String, endTime: String, returnCode: Int, description: String){
    val insertString = s"INSERT INTO TABLE TestResult VALUES( FROM_UNIXTIME(UNIX_TIMESTAMP()),'$startTime','$endTime','$tableName',$returnCode,'$description')"
    val a = hc.sql(insertString)

    }


  def TableASchema(columnName: List[String]): StructType = {
    StructType(
      Seq(
        StructField(name = "datetime", dataType = TimestampType, nullable = true),
        StructField(name = "load_datetime", dataType = TimestampType, nullable = true),
        StructField(name = "source_bank", dataType = StringType, nullable = true),
        StructField(name = "emp_name", dataType = StringType, nullable = true),
        StructField(name = "header_row_count", dataType = IntegerType, nullable = true),
        StructField(name = "emp_hours", dataType = DoubleType, nullable = true)
        )
    )
  }

  def row(line: List[String]): Row = {
       Row(convertToTimestamp(line(0).trim), convertToDate(line(1).trim), line(2).trim, line(3).trim, line(4).toInt, line(5).toDouble)
    }


  def convertToTimestamp(s: String) : Timestamp = s match {
     case "" => null
     case _ => {
        val format = new SimpleDateFormat("ddMMMyyyy:HH:mm:ss")
        Try(new Timestamp(format.parse(s).getTime)) match {
        case Success(t) => t
        case Failure(_) => null
      }
    }
  }

   def convertToDate(s: String) : Timestamp = s match {
     case "" => null
     case _ => {
        val format = new SimpleDateFormat("ddMMMyyyy")
        Try(new Timestamp(format.parse(s).getTime)) match {
        case Success(t) => t
        case Failure(_) => null
      }
    }
  }

    def flatten(scheme: StructType): Array[StructField] = scheme.fields.flatMap { f =>
      f.dataType match {
      case struct:StructType => flatten(struct)
      case _ => Array(f)
       }
      }

    def getTimestamp(): String = {
        val now = java.util.Calendar.getInstance()
        val timestampFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        timestampFormat.format(now.getTime())
    }

Exception is below:

17/12/21 05:18:40 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(8,0,ShuffleMapTask,TaskKilled(stage cancelled),org.apache.spark.scheduler.TaskInfo@78db3052,null)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in stage 8.0 failed 1 times, most recent failure: Lost task 17.0 in stage 8.0 (TID 323, localhost, executor driver): java.lang.OutOfMemoryError: GC overhead limit exceeded

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
  at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
  at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
  at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2430)
  at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2429)
  at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836)
  at org.apache.spark.sql.Dataset.count(Dataset.scala:2429)
  ... 53 elided
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded

scala> 17/12/21 05:18:40 ERROR ShutdownHookManager: Exception while deleting Spark temp dir: /tmp/spark-6f345216-41df-4fd6-8e3d-e34d49e28f0c
java.io.IOException: Failed to delete: /tmp/spark-6f345216-41df-4fd6-8e3d-e34d49e28f0c
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1031)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:65)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:62)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1.apply$mcV$sp(ShutdownHookManager.scala:62)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1954)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)

回答1:

Your spark process is wasting too much time in Garbage collection.Most of the cpu core is getting consumed and processing doesnt completes.You are running out of executor memory.You can try below options

  • Tune the property spark.storage.memoryFraction and spark.memory.storageFraction.You can also issue the command to tune this-spark-submit ... --executor-memory 4096m --num-executors 20..
  • Or by changing the GC policy.Check the current GC value.Set the value to -XX:G1GC