spark sql distance to nearest holiday

2019-09-13 07:57发布

问题:

In pandas I have a function similar to

indices = df.dateColumn.apply(holidays.index.searchsorted)
df['nextHolidays'] = holidays.index[indices]
df['previousHolidays'] = holidays.index[indices - 1]

which calculates the distance to the nearest holiday and stores that as a new column.

searchsorted http://pandas.pydata.org/pandas-docs/version/0.18.1/generated/pandas.Series.searchsorted.html was a great solution for pandas as this gives me the index of the next holiday without a high algorithmic complexity Parallelize pandas apply e.g. this approach was a lot quicker then parallel looping.

How can I achieve this in spark or hive?

回答1:

This can be done using aggregations but this method would have higher complexity than pandas method. But you can achieve similar performance using UDFs. It won't be as elegant as pandas, but:

Assuming this dataset of holidays:

holidays = ['2016-01-03', '2016-09-09', '2016-12-12', '2016-03-03']
index = spark.sparkContext.broadcast(sorted(holidays))

And dataset of dates of 2016 in dataframe:

from datetime import datetime, timedelta
dates_array = [(datetime(2016, 1, 1) + timedelta(i)).strftime('%Y-%m-%d') for i in range(366)]
from pyspark.sql import Row
df = spark.createDataFrame([Row(date=d) for d in dates_array])

The UDF can use pandas searchsorted but would need to install pandas on executors. Insted you can use plan python like this:

def nearest_holiday(date):
    last_holiday = index.value[0]
    for next_holiday in index.value:
        if next_holiday >= date:
            break
        last_holiday = next_holiday
    if last_holiday > date:
        last_holiday = None
    if next_holiday < date:
        next_holiday = None
    return (last_holiday, next_holiday)


from pyspark.sql.types import *
return_type = StructType([StructField('last_holiday', StringType()), StructField('next_holiday', StringType())])

from pyspark.sql.functions import udf
nearest_holiday_udf = udf(nearest_holiday, return_type)

And can be used with withColumn:

df.withColumn('holiday', nearest_holiday_udf('date')).show(5, False)

+----------+-----------------------+
|date      |holiday                |
+----------+-----------------------+
|2016-01-01|[null,2016-01-03]      |
|2016-01-02|[null,2016-01-03]      |
|2016-01-03|[2016-01-03,2016-01-03]|
|2016-01-04|[2016-01-03,2016-03-03]|
|2016-01-05|[2016-01-03,2016-03-03]|
+----------+-----------------------+
only showing top 5 rows