I'm creating data frame with this code:
val data = List(
List(444.1235D),
List(67.5335D),
List(69.5335D),
List(677.5335D),
List(47.5335D),
List(null)
)
val rdd = sparkContext.parallelize(data).map(Row.fromSeq(_))
val schema = StructType(Array(
StructField("value", DataTypes.DoubleType, true)
))
val df = sqlContext.createDataFrame(rdd, schema)
Then I apply my udf to it:
val multip: Dataset[Double] = df.select(doubleUdf(df("value"))).as[Double]
and then I'm trying to use reduce on this dataset:
val multipl = multip.reduce(_ * _)
And here I got 0.0 as a result.
Also I tried to filter nulls out
val multipl = multip.filter(_ != null).reduce(_ * _)
with the same result.
If I remove null value from data everything works as it should. How can I make reduce work with null values?
My udf is defined like this:
val doubleUdf: UserDefinedFunction = udf((v: Any) => Try(v.toString.toDouble).toOption)
I'll answer with a strong assumption that your doubleUdf function converts values to doubles, and rather than using an Option wrapper for nulls you are turning nulls into 0.0. So, if you want to keep the logic to drop nulls, then filter BEFORE anything else:
df.na.drop.select(doubleUdf(df("value"))).as[Double]
First, I would ask why you are even dealing with null
at all. I would evaluate the way I'm reading the data to ensure that doesn't happen.
Then I would note you can eliminate the null
from your in-memory List
before you even get to the RDD level like this for example:
data.flatMap(Option(_)).flatten
But if you must deal with null
at the RDD level, you have options (no pun intended):
sparkContext.parallelize(data).filter(!_.contains(null))
or
sparkContext.parallelize(data).map(_.flatMap(Option(_))).filter(_.nonEmpty)
I prefer the latter. I don't like looking at null
in Scala code.
I would stay away from a UDF-based solution since Spark can't optimize UDFs, and it is a shame to lose Spark's optimization capabilities over something as lame as null
.