I'm using SparkSQL on pyspark to store some PostgreSQL tables into DataFrames and then build a query that generates several time series based on a start
and stop
columns of type date
.
Suppose that my_table
contains:
start | stop
-------------------------
2000-01-01 | 2000-01-05
2012-03-20 | 2012-03-23
In PostgreSQL it's very easy to do that:
SELECT generate_series(start, stop, '1 day'::interval)::date AS dt FROM my_table
and it will generate this table:
dt
------------
2000-01-01
2000-01-02
2000-01-03
2000-01-04
2000-01-05
2012-03-20
2012-03-21
2012-03-22
2012-03-23
but how to do that using plain SparkSQL? Will it be necessary to use UDFs or some DataFrame methods?
Suppose you have dataframe df
from spark sql, Try this
from pyspark.sql.functions as F
from pyspark.sql.types as T
def timeseriesDF(start, total):
series = [start]
for i xrange( total-1 ):
series.append(
F.date_add(series[-1], 1)
)
return series
df.withColumn("t_series", F.udf(
timeseriesDF,
T.ArrayType()
) ( df.start, F.datediff( df.start, df.stop ) )
).select(F.explode("t_series")).show()
@Rakesh answer is correct, but I would like to share a less verbose solution:
import datetime
import pyspark.sql.types
from pyspark.sql.functions import UserDefinedFunction
# UDF
def generate_date_series(start, stop):
return [start + datetime.timedelta(days=x) for x in range(0, (stop-start).days + 1)]
# Register UDF for later usage
spark.udf.register("generate_date_series", generate_date_series, ArrayType(DateType()) )
# mydf is a DataFrame with columns `start` and `stop` of type DateType()
mydf.createOrReplaceTempView("mydf")
spark.sql("SELECT explode(generate_date_series(start, stop)) FROM mydf").show()
EDIT
This creates a dataframe with one row containing an array of consecutive dates:
from pyspark.sql.functions import sequence, to_date, explode, col
spark.sql("SELECT sequence(to_date('2018-01-01'), to_date('2018-03-01'), interval 1 month) as date")
+------------------------------------------+
| date |
+------------------------------------------+
| ["2018-01-01","2018-02-01","2018-03-01"] |
+------------------------------------------+
You can use the explode function to "pivot" this array into rows:
spark.sql("SELECT sequence(to_date('2018-01-01'), to_date('2018-03-01'), interval 1 month) as date").withColumn("date", explode(col("date"))
+------------+
| date |
+------------+
| 2018-01-01 |
| 2018-02-01 |
| 2018-03-01 |
+------------+
(End of edit)
Spark v2.4 support sequence
function:
sequence(start, stop, step) - Generates an array of elements from start to stop (inclusive), incrementing by step. The type of the returned elements is the same as the type of argument expressions.
Supported types are: byte, short, integer, long, date, timestamp.
Examples:
SELECT sequence(1, 5);
[1,2,3,4,5]
SELECT sequence(5, 1);
[5,4,3,2,1]
SELECT sequence(to_date('2018-01-01'), to_date('2018-03-01'), interval 1 month);
[2018-01-01,2018-02-01,2018-03-01]
https://docs.databricks.com/spark/latest/spark-sql/language-manual/functions.html#sequence
The existing answers will work, but are very inefficient. Instead it is better to use range
and then cast data. In Python
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
def generate_series(start, stop, interval):
"""
:param start - lower bound, inclusive
:param stop - upper bound, exclusive
:interval int - increment interval in seconds
"""
spark = SparkSession.builder.getOrCreate()
# Determine start and stops in epoch seconds
start, stop = spark.createDataFrame(
[(start, stop)], ("start", "stop")
).select(
[col(c).cast("timestamp").cast("long") for c in ("start", "stop")
]).first()
# Create range with increments and cast to timestamp
return spark.range(start, stop, interval).select(
col("id").cast("timestamp").alias("value")
)
Example usage:
generate_series("2000-01-01", "2000-01-05", 60 * 60).show(5) # By hour
+-------------------+
| value|
+-------------------+
|2000-01-01 00:00:00|
|2000-01-01 01:00:00|
|2000-01-01 02:00:00|
|2000-01-01 03:00:00|
|2000-01-01 04:00:00|
+-------------------+
only showing top 5 rows
generate_series("2000-01-01", "2000-01-05", 60 * 60 * 24).show() # By day
+-------------------+
| value|
+-------------------+
|2000-01-01 00:00:00|
|2000-01-02 00:00:00|
|2000-01-03 00:00:00|
|2000-01-04 00:00:00|
+-------------------+