Spark dataset reduce with null values?

2019-09-15 09:30发布

问题:

I'm creating data frame with this code:

  val data = List(
    List(444.1235D),
    List(67.5335D),
    List(69.5335D),
    List(677.5335D),
    List(47.5335D),
    List(null)
  )

  val rdd = sparkContext.parallelize(data).map(Row.fromSeq(_))
  val schema = StructType(Array(
    StructField("value", DataTypes.DoubleType, true)
  ))

  val df = sqlContext.createDataFrame(rdd, schema)

Then I apply my udf to it:

val multip: Dataset[Double] = df.select(doubleUdf(df("value"))).as[Double]

and then I'm trying to use reduce on this dataset:

val multipl = multip.reduce(_ * _)

And here I got 0.0 as a result. Also I tried to filter nulls out

val multipl = multip.filter(_ != null).reduce(_ * _)

with the same result. If I remove null value from data everything works as it should. How can I make reduce work with null values?

My udf is defined like this:

val doubleUdf: UserDefinedFunction = udf((v: Any) => Try(v.toString.toDouble).toOption)

回答1:

I'll answer with a strong assumption that your doubleUdf function converts values to doubles, and rather than using an Option wrapper for nulls you are turning nulls into 0.0. So, if you want to keep the logic to drop nulls, then filter BEFORE anything else:

df.na.drop.select(doubleUdf(df("value"))).as[Double]


回答2:

First, I would ask why you are even dealing with null at all. I would evaluate the way I'm reading the data to ensure that doesn't happen.

Then I would note you can eliminate the null from your in-memory List before you even get to the RDD level like this for example:

data.flatMap(Option(_)).flatten

But if you must deal with null at the RDD level, you have options (no pun intended):

sparkContext.parallelize(data).filter(!_.contains(null))

or

sparkContext.parallelize(data).map(_.flatMap(Option(_))).filter(_.nonEmpty)

I prefer the latter. I don't like looking at null in Scala code.

I would stay away from a UDF-based solution since Spark can't optimize UDFs, and it is a shame to lose Spark's optimization capabilities over something as lame as null.