I have next algorithm: read from CSV file in a failfast mode with specified schema. If schema is wrong, handle SparkException by reading CSV again (without specified schema).
implicit val ss: SparkSession = SparkSession.builder()/*...*/.getOrCreate()
val inputSchema = StructType( ... )
val path = "src/main/resources/test.csv"
try {
val inputDFStructured = readCSV(path, Some(inputSchema), Map("header" -> "true", "mode" -> "FAILFAST"))
//... number of different transformations with structuredDF
inputDFStructured.write.parquet("...")
} catch {
case se: SparkException => {
println("Failed to read using specified schema: " + se.getMessage)
val inputDFStringSchema = readCSV(path, None, Map("header" -> "true"))
//... number of different transformations with inputDFStringSchema
inputDFStringSchema.write.parquet("...")
}
}
def readCSV(path: String, schema: Option[StructType], options: Map[String, String])(implicit ss: SparkSession): DataFrame = {
ss.read.schema(schema.orNull).options(options).csv(path)
}
Is this code safe considering lazy evaluation?
Is it possible that I get some lines written to output path before it goes to catch block due to schema validation exception thrown (because of lazy evaluation or smth)?