I have a Spark Dataframe with some missing values. I would like to perform a simple imputation by replacing the missing values with the mean for that column. I am very new to Spark, so I have been struggling to implement this logic. This is what I have managed to do so far:
a) To do this for a single column (let's say Col A), this line of code seems to work:
df.withColumn("new_Col", when($"ColA".isNull, df.select(mean("ColA"))
.first()(0).asInstanceOf[Double])
.otherwise($"ColA"))
b) However, I have not been able to figure out, how to do this for all the columns in my dataframe. I was trying out the Map function, but I believe it loops through each row of a dataframe
c) There is a similar question on SO - here. And while I liked the solution (using Aggregated tables and coalesce), I was very keen to know if there is a way to do this by looping through each column (I come from R, so looping through each column using a higher order functional like lapply seems more natural to me).
Thanks!
Spark >= 2.2
You can use
org.apache.spark.ml.feature.Imputer
(which supports both mean and median strategy).Scala :
Python:
Spark < 2.2
Here you are:
where
computes an average for each column,
collects aggregated values and converts row to
Seq[Any]
(I know it is suboptimal but this is the API we have to work with),creates
aMap: Map[String, Any]
which maps from the column name to its average, and finally:fills the missing values using:
from
DataFrameNaFunctions
.To ingore
NaN
entries you can replace:with:
For PySpark, this is the code I used:
The four steps are:
mean_dict
mapping column names to the aggregate operation (mean)col_avgs
col_avgs
start withavg(
and end with)
, e.g.avg(col1)
. Strip the parentheses out.col_avgs
For imputing the median (instead of the mean) in PySpark < 2.2
Then, apply
na.fill