I have a Spark job, written in Python, which is getting odd behaviour when checking for errors in its data. A simplified version is below:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, DoubleType
from pyspark.sql.functions import col, lit
spark = SparkSession.builder.master("local[3]").appName("pyspark-unittest").getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
SCHEMA = StructType([
StructField("headerDouble", DoubleType(), False),
StructField("ErrorField", StringType(), False)
])
dataframe = (
spark.read
.option("header", "true")
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "ErrorField")
.schema(SCHEMA).csv("./x.csv")
)
total_row_count = dataframe.count()
print("total_row_count = " + str(total_row_count))
errors = dataframe.filter(col("ErrorField").isNotNull())
errors.show()
error_count = errors.count()
print("errors count = " + str(error_count))
The csv it is reading is simply:
headerDouble
wrong
The relevant output of this is
total_row_count = 1
+------------+----------+
|headerDouble|ErrorField|
+------------+----------+
| null| wrong|
+------------+----------+
errors count = 0
Now how does this possibly happen? If the dataframe has a record, how is being counted as 0? Is this a bug in the Spark infrastructure or am I missing something?
EDIT: Looks like this might be a known bug on Spark 2.2 which has been fixed in Spark 2.3 - https://issues.apache.org/jira/browse/SPARK-21610