Pyspark : forward fill with last observation for a

2019-01-11 21:41发布

问题:

Using Spark 1.5.1,

I've been trying to forward fill null values with the last known observation for one column of my DataFrame.

It is possible to start with a null value and for this case I would to backward fill this null value with the first knwn observation. However, If that too complicates the code, this point can be skipped.

In this post, a solution in Scala was provided for a very similar problem by zero323.

But, I don't know Scala and I don't succeed to ''translate'' it in Pyspark API code. It's possible to do it with Pyspark ?

Thanks for your help.

Below, a simple example sample input:

| cookie_ID     | Time       | User_ID   
| ------------- | --------   |------------- 
| 1             | 2015-12-01 | null 
| 1             | 2015-12-02 | U1
| 1             | 2015-12-03 | U1
| 1             | 2015-12-04 | null   
| 1             | 2015-12-05 | null     
| 1             | 2015-12-06 | U2
| 1             | 2015-12-07 | null
| 1             | 2015-12-08 | U1
| 1             | 2015-12-09 | null      
| 2             | 2015-12-03 | null     
| 2             | 2015-12-04 | U3
| 2             | 2015-12-05 | null   
| 2             | 2015-12-06 | U4

And the expected output:

| cookie_ID     | Time       | User_ID   
| ------------- | --------   |------------- 
| 1             | 2015-12-01 | U1
| 1             | 2015-12-02 | U1
| 1             | 2015-12-03 | U1
| 1             | 2015-12-04 | U1
| 1             | 2015-12-05 | U1
| 1             | 2015-12-06 | U2
| 1             | 2015-12-07 | U2
| 1             | 2015-12-08 | U1
| 1             | 2015-12-09 | U1
| 2             | 2015-12-03 | U3
| 2             | 2015-12-04 | U3
| 2             | 2015-12-05 | U3
| 2             | 2015-12-06 | U4

回答1:

The partitioned example code from Spark / Scala: forward fill with last observation in pyspark is shown. This only works for data that can be partitioned.

Load the data

values = [
    (1, "2015-12-01", None),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-03", "U2"),
    (1, "2015-12-04", None),
    (1, "2015-12-05", None),
    (2, "2015-12-04", None),
    (2, "2015-12-03", None),
    (2, "2015-12-02", "U3"),
    (2, "2015-12-05", None),
]
rdd = sc.parallelize(values)
df = rdd.toDF(["cookie_id", "c_date", "user_id"])
df = df.withColumn("c_date", df.c_date.cast("date"))
df.show()

The DataFrame is

+---------+----------+-------+
|cookie_id|    c_date|user_id|
+---------+----------+-------+
|        1|2015-12-01|   null|
|        1|2015-12-02|     U1|
|        1|2015-12-02|     U1|
|        1|2015-12-03|     U2|
|        1|2015-12-04|   null|
|        1|2015-12-05|   null|
|        2|2015-12-04|   null|
|        2|2015-12-03|   null|
|        2|2015-12-02|     U3|
|        2|2015-12-05|   null|
+---------+----------+-------+

Column used to sort the partitions

# get the sort key
def getKey(item):
    return item.c_date

The fill function. Can be used to fill in multiple columns if necessary.

# fill function
def fill(x):
    out = []
    last_val = None
    for v in x:
        if v["user_id"] is None:
            data = [v["cookie_id"], v["c_date"], last_val]
        else:
            data = [v["cookie_id"], v["c_date"], v["user_id"]]
            last_val = v["user_id"]
        out.append(data)
    return out

Convert to rdd, partition, sort and fill the missing values

# Partition the data
rdd = df.rdd.groupBy(lambda x: x.cookie_id).mapValues(list)
# Sort the data by date
rdd = rdd.mapValues(lambda x: sorted(x, key=getKey))
# fill missing value and flatten
rdd = rdd.mapValues(fill).flatMapValues(lambda x: x)
# discard the key
rdd = rdd.map(lambda v: v[1])

Convert back to DataFrame

df_out = sqlContext.createDataFrame(rdd)
df_out.show()

The output is

+---+----------+----+
| _1|        _2|  _3|
+---+----------+----+
|  1|2015-12-01|null|
|  1|2015-12-02|  U1|
|  1|2015-12-02|  U1|
|  1|2015-12-03|  U2|
|  1|2015-12-04|  U2|
|  1|2015-12-05|  U2|
|  2|2015-12-02|  U3|
|  2|2015-12-03|  U3|
|  2|2015-12-04|  U3|
|  2|2015-12-05|  U3|
+---+----------+----+


回答2:

Hope you find this forward fill function useful. It is written using native pyspark function. Neither udf nor rdd being used (both of them are very slow, especially UDF!).

Let's use example provided by @Sid.

values = [
    (1, "2015-12-01", None),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-02", "U1"),
    (1, "2015-12-03", "U2"),
    (1, "2015-12-04", None),
    (1, "2015-12-05", None),
    (2, "2015-12-04", None),
    (2, "2015-12-03", None),
    (2, "2015-12-02", "U3"),
    (2, "2015-12-05", None),
] 

df = spark.createDataFrame(values, ['cookie_ID', 'Time', 'User_ID'])

Functions:

def cum_sum(df, sum_col , order_col, cum_sum_col_nm='cum_sum'):  
    '''Find cumulative sum of a column. 
    Parameters 
    -----------
    sum_col : String 
        Column to perform cumulative sum. 
    order_col : List 
        Column/columns to sort for cumulative sum. 
    cum_sum_col_nm : String
        The name of the resulting cum_sum column. 

    Return
    -------
    df : DataFrame
        Dataframe with additional "cum_sum_col_nm". 

    '''
    df = df.withColumn('tmp', lit('tmp')) 

    windowval = (Window.partitionBy('tmp') 
                 .orderBy(order_col)
                 .rangeBetween(Window.unboundedPreceding, 0)) 

    df = df.withColumn('cum_sum', sum(sum_col).over(windowval).alias('cumsum').cast(StringType()))
    df = df.drop('tmp') 
    return df   


def forward_fill(df, order_col, fill_col, fill_col_name=None):
    '''Forward fill a column by a column/set of columns (order_col).  
    Parameters:
    ------------
    df: Dataframe 
    order_col: String or List of string
    fill_col: String (Only work for a column for this version.) 

    Return:
    ---------
    df: Dataframe 
        Return df with the filled_cols. 
    '''

    # "value" and "constant" are tmp columns created ton enable forward fill. 
    df = df.withColumn('value', when(col(fill_col).isNull(), 0).otherwise(1))
    df = cum_sum(df, 'value', order_col).drop('value')  
    df = df.withColumn(fill_col, 
                when(col(fill_col).isNull(), 'constant').otherwise(col(fill_col))) 

    win = (Window.partitionBy('cum_sum') 
              .orderBy(order_col)) 

    if not fill_col_name:
        fill_col_name = 'ffill_{}'.format(fill_col)

    df = df.withColumn(fill_col_name, collect_list(fill_col).over(win)[0])
    df = df.drop('cum_sum')
    df = df.withColumn(fill_col_name, when(col(fill_col_name)=='constant', None).otherwise(col(fill_col_name)))
    df = df.withColumn(fill_col, when(col(fill_col)=='constant', None).otherwise(col(fill_col)))
    return df   

Let's see the results.

ffilled_df = forward_fill(df, 
                          order_col=['cookie_ID', 'Time'], 
                          fill_col='User_ID', 
                          fill_col_name = 'User_ID_ffil')
ffilled_df.sort(['cookie_ID', 'Time']).show()   



回答3:

Cloudera has released a library called spark-ts that offers a suite of useful methods for processing time series and sequential data in Spark. This library supports a number of time-windowed methods for imputing data points based on other data in the sequence.

http://blog.cloudera.com/blog/2015/12/spark-ts-a-new-library-for-analyzing-time-series-data-with-apache-spark/