Spark SQL exception handling

2020-05-24 06:31发布

问题:

In order to handle Spark exception on RDD operations I can use the following approach with additional exceptions column:

val df: DataFrame = ...

val rddWithExcep = df.rdd.map { row: Row =>
  val memberIdStr = row.getAs[String]("member_id")
  val memberIdInt = Try(memberIdStr.toInt) match {
    case Success(integer) => List(integer, null)
    case Failure(ex) => List(null, ex.toString)
  }
  Row.fromSeq(row.toSeq.toList ++ memberIdInt)
}

val castWithExceptionSchema = StructType(df.schema.fields ++ Array(StructField("member_id_int", IntegerType, true)
  , StructField("exceptions", StringType, true)))

val castExcepDf = sparkSession.sqlContext.createDataFrame(rddWithExcep, castWithExceptionSchema)

castExcepDf.printSchema()
castExcepDf.show()

Is it possible to handle such exception on Spark SQL? For example, currently in case of any errors, Spark SQL simply returns null value and hides the error.

For example division by 0 will be resulted into null value and not into an error.. In my opinion - this is a very serious issue in Spark SQL because it can simple produces unexpected/wrong data that you won't even notice.

Is it possible to override this behavior and let Spark fail with an appropriate detailed exception?