How to subtract a column of days from a column of

2020-08-10 19:43发布

问题:

Given the following PySpark DataFrame

df = sqlContext.createDataFrame([('2015-01-15', 10),
                                 ('2015-02-15', 5)],
                                 ('date_col', 'days_col'))

How can the days column be subtracted from the date column? In this example, the resulting column should be ['2015-01-05', '2015-02-10'].

I looked into pyspark.sql.functions.date_sub(), but it requires a date column and a single day, i.e. date_sub(df['date_col'], 10). Ideally, I'd prefer to do date_sub(df['date_col'], df['days_col']).

I also tried creating a UDF:

from datetime import timedelta
def subtract_date(start_date, days_to_subtract):
    return start_date - timedelta(days_to_subtract)

subtract_date_udf = udf(subtract_date, DateType())
df.withColumn('subtracted_dates', subtract_date_udf(df['date_col'], df['days_col'])

This technically works, but I've read that stepping between Spark and Python can cause performance issues for large datasets. I can stick with this solution for now (no need to prematurely optimize), but my gut says there's just got to be a way to do this simple thing without using a Python UDF.

回答1:

I was able to solve this using selectExpr.

df.selectExpr('date_sub(date_col, day_col) as subtracted_dates')

If you want to append the column to the original DF, just add * to the expression

df.selectExpr('*', 'date_sub(date_col, day_col) as subtracted_dates')


回答2:

Use expr function (if you have dynamic values from columns to substract):

>>> from pyspark.sql.functions import *
>>> df.withColumn('substracted_dates',expr("date_sub(date_col,days_col)"))

Use withColumn function(if you have literal values to substract):

>>> df.withColumn('substracted_dates',date_sub('date_col',<int_literal_value>))


回答3:

Not the most elegant solution ever but if you don't want to hack SQL expressions in Scala (not that it should be hard, but these are private to sql) something like this should do the trick:

from pyspark.sql import Column

def date_sub_(c1: Column, c2: Column) -> Column:
    return ((c1.cast("timestamp").cast("long") - 60 * 60 * 24 * c2)
        .cast("timestamp").cast("date"))

For Python 2.x just drop type annotations.



回答4:

slightly different format, but also works:

df.registerTempTable("dfTbl")

newdf = spark.sql("""
                     SELECT *, date_sub(d.date_col, d.day_col) AS DateSub 
                     FROM dfTbl d
                   """)