I am using the spark 2.0.1 and want to fill nan values with the last good known value in the column.
The only reference for spark I could find Spark / Scala: forward fill with last observation or Fill in null with previously known good value with pyspark which seem to use RDD.
I would rather like to stay in the data frame / dataset world and possible handle multiple nan values. Is this possible?
My assumption is that the data (initially loaded from e.g. a CSV file is ordered by time and this order is preserved in the distributed setting e.g. filling by close / last good known value is correct. Maybe filling with the previous value is enough as for most records there are no 2 or more nan records in a row. Does this actually hold? The point is that a
myDf.sort("foo").show
Would destroy any order e.g. all null
values will come first.
A small example:
import java.sql.{ Date, Timestamp }
case class FooBar(foo:Date, bar:String)
val myDf = Seq(("2016-01-01","first"),("2016-01-02","second"),("2016-wrongFormat","noValidFormat"), ("2016-01-04","lastAssumingSameDate"))
.toDF("foo","bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
Results in
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
| null| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
I would like to fix the value with the last good known value. How can I achieve this?
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
|2016-01-02| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
edit
in my case, it would be good enough to fill the value from the row above, as there are only very limited faulty values.
edit2
I try to add an index column
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), ("2016-wrongFormat", "noValidFormat"), ("2016-01-04", "lastAssumingSameDate"))
.toDF("foo", "bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
.withColumn("rowId", monotonically_increasing_id())
And then fill with the last value.
myDf.withColumn("fooLag", lag('foo, 1) over Window.orderBy('rowId)).show
But that reads the following warning: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. How could I introduce meaningful partitions?
+----------+--------------------+-----+----------+
| foo| bar|rowId| fooLag|
+----------+--------------------+-----+----------+
|2016-01-01| first| 0| null|
|2016-01-02| second| 1|2016-01-01|
| null| noValidFormat| 2|2016-01-02|
|2016-01-04|lastAssumingSameDate| 3| null|
+----------+--------------------+-----+----------+