Fill in null with previously known good value with

2019-01-17 12:32发布

问题:

Is there a way to replace null values in pyspark dataframe with the last valid value? There is addtional timestamp and session columns if you think you need them for windows partitioning and ordering. More specifically, I'd like to achieve the following conversion:

+---------+-----------+-----------+      +---------+-----------+-----------+
| session | timestamp |         id|      | session | timestamp |         id|
+---------+-----------+-----------+      +---------+-----------+-----------+
|        1|          1|       null|      |        1|          1|       null|
|        1|          2|        109|      |        1|          2|        109|
|        1|          3|       null|      |        1|          3|        109|
|        1|          4|       null|      |        1|          4|        109|
|        1|          5|        109| =>   |        1|          5|        109|
|        1|          6|       null|      |        1|          6|        109|
|        1|          7|        110|      |        1|          7|        110|
|        1|          8|       null|      |        1|          8|        110|
|        1|          9|       null|      |        1|          9|        110|
|        1|         10|       null|      |        1|         10|        110|
+---------+-----------+-----------+      +---------+-----------+-----------+

回答1:

This seems to be doing the trick using Window functions:

import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func

def fill_nulls(df):
    df_na = df.na.fill(-1)
    lag = df_na.withColumn('id_lag', func.lag('id', default=-1)\
                           .over(Window.partitionBy('session')\
                                 .orderBy('timestamp')))

    switch = lag.withColumn('id_change',
                            ((lag['id'] != lag['id_lag']) &
                             (lag['id'] != -1)).cast('integer'))


    switch_sess = switch.withColumn(
        'sub_session',
        func.sum("id_change")
        .over(
            Window.partitionBy("session")
            .orderBy("timestamp")
            .rowsBetween(-sys.maxsize, 0))
    )

    fid = switch_sess.withColumn('nn_id',
                           func.first('id')\
                           .over(Window.partitionBy('session', 'sub_session')\
                                 .orderBy('timestamp')))

    fid_na = fid.replace(-1, 'null')

    ff = fid_na.drop('id').drop('id_lag')\
                          .drop('id_change')\
                          .drop('sub_session').\
                          withColumnRenamed('nn_id', 'id')

    return ff

Here is the full null_test.py.



回答2:

I believe I have a much simpler solution than the accepted. It is using Functions too, but uses the function called 'LAST' and ignores nulls.

Let's re-create something similar to the original data:

import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func

d = [{'session': 1, 'ts': 1}, {'session': 1, 'ts': 2, 'id': 109}, {'session': 1, 'ts': 3}, {'session': 1, 'ts': 4, 'id': 110}, {'session': 1, 'ts': 5},  {'session': 1, 'ts': 6}]
df = spark.createDataFrame(d)

This prints:

+-------+---+----+
|session| ts|  id|
+-------+---+----+
|      1|  1|null|
|      1|  2| 109|
|      1|  3|null|
|      1|  4| 110|
|      1|  5|null|
|      1|  6|null|
+-------+---+----+

Now, if we use the window function LAST:

df.withColumn("id", func.last('id', True).over(Window.partitionBy('session').orderBy('ts').rowsBetween(-sys.maxsize, 0))).show()

We just get:

+-------+---+----+
|session| ts|  id|
+-------+---+----+
|      1|  1|null|
|      1|  2| 109|
|      1|  3| 109|
|      1|  4| 110|
|      1|  5| 110|
|      1|  6| 110|
+-------+---+----+

Hope it helps!



回答3:

@Oleksiy's answer is great, but didn't fully work for my requirements. Within a session, if multiple nulls are observed, all are filled with the first non-null for the session. I needed the last non-null value to propagate forward.

The following tweak worked for my use case:

def fill_forward(df, id_column, key_column, fill_column):

    # Fill null's with last *non null* value in the window
    ff = df.withColumn(
        'fill_fwd',
        func.last(fill_column, True) # True: fill with last non-null
        .over(
            Window.partitionBy(id_column)
            .orderBy(key_column)
            .rowsBetween(-sys.maxsize, 0))
        )

    # Drop the old column and rename the new column
    ff_out = ff.drop(fill_column).withColumnRenamed('fill_fwd', fill_column)

    return ff_out