Spark read failfast csv and reread again in catch

2019-08-26 18:49发布

问题:

I have next algorithm: read from CSV file in a failfast mode with specified schema. If schema is wrong, handle SparkException by reading CSV again (without specified schema).

implicit val ss: SparkSession = SparkSession.builder()/*...*/.getOrCreate()

val inputSchema = StructType( ... )
val path = "src/main/resources/test.csv"

try {
  val inputDFStructured = readCSV(path, Some(inputSchema), Map("header" -> "true", "mode" -> "FAILFAST"))

  //... number of different transformations with structuredDF

  inputDFStructured.write.parquet("...")

} catch {
  case se: SparkException => {
    println("Failed to read using specified schema: " + se.getMessage)

    val inputDFStringSchema = readCSV(path, None, Map("header" -> "true"))

    //... number of different transformations with inputDFStringSchema

    inputDFStringSchema.write.parquet("...")
  }
}

  def readCSV(path: String, schema: Option[StructType], options: Map[String, String])(implicit ss: SparkSession): DataFrame = {
       ss.read.schema(schema.orNull).options(options).csv(path)
}
  1. Is this code safe considering lazy evaluation?

  2. Is it possible that I get some lines written to output path before it goes to catch block due to schema validation exception thrown (because of lazy evaluation or smth)?