I see in this DataBricks post, there is support for window functions in SparkSql, in particular I'm trying to use the lag() window function.
I have rows of credit card transactions, and I've sorted them, now I want to iterate over the rows, and for each row display the amount of the transaction, and the difference of the current row's amount and the preceding row's amount.
Following the DataBricks post, I've come up with this query, but it's throwing an exception at me and I can't quite undestand why..
This is in PySpark.. tx is my dataframe already created at registered as a temp table.
test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")
and the exception (truncated)..
py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql.
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found
I'd really apprecaite any insight, this functionality is rather new and there's not a lot to go on as far as existing examples or other related posts.
Edit
I've also attempted to do this without SQL statement as below, but continue to get an error. I've used this with Hive and SQLContext, and receive the same error.
windowSpec = \
Window \
.partitionBy(h_tx_df_ordered['cc_num']) \
.orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time'])
windowSpec.rowsBetween(-1, 0)
lag_amt = \
(lag(h_tx_df_ordered['amt']).over(windowSpec) - h_tx_df_ordered['amt'])
tx_df_ordered.select(
h_tx_df_ordered['cc_num'],
h_tx_df_ordered['trans_date'],
h_tx_df_ordered['trans_time'],
h_tx_df_ordered['amt'],
lag_amt.alias("prev_amt")).show()
Traceback (most recent call last):
File "rdd_raw_data.py", line 116, in <module>
lag_amt.alias("prev_amt")).show()
File "/opt/spark/python/pyspark/sql/dataframe.py", line 721, in select
jdf = self._jdf.select(self._jcols(*cols))
File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 813, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py", line 308, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o152.select.
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext;
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
ROWS
notROW
Frame specification requires either lower bound value
or
UNBOUNDED
keywordLAG
function doesn't accept frame at all so a correct SQL query with lag can look like thisEdit:
Regarding SQL DSL usage:
As you can read in an error message
Be sure to initialize
sqlContext
usingHiveContext
notSQLContext
windowSpec.rowsBetween(-1, 0)
does nothing, but once again frame specification is not supported by thelag
function.