This question already has an answer here:
-
How to get a value from the Row object in Spark Dataframe?
3 answers
I am currently exploring how to call big hql files (contains 100 line of an insert into select statement) via sqlContext.
Another thing is, The hqls files are parameterize, so while calling it from sqlContext, I want to pass the parameters as well.
Have gone through loads of blogs and posts, but not found any answers to this.
Another thing I was trying, to store an output of rdd into a variable.
pyspark
max_date=sqlContext.sql("select max(rec_insert_date) from table")
now want to pass max_date as variable to next rdd
incremetal_data=sqlConext.sql(s"select count(1) from table2 where rec_insert_date > $max_dat")
This is not working , moreover the value for max_date
is coming as =
u[row-('20018-05-19 00:00:00')]
now this is not clear how to trim those extra characters.
The sql Context reterns a Dataset[Row]. You can get your value from there with
max_date=sqlContext.sql("select count(rec_insert_date) from table").first()[0]
In Spark 2.0+ using spark Session you can
max_date=spark.sql("select count(rec_insert_date) from table").rdd.first()[0]
to get the underlying rdd from the returned dataframe
Shouldn't you use max(rec_insert_date)
instead of count(rec_insert_date)
?
You have two options on passing values returned from one query to another:
Use collect, which will trigger computations and assign returned value to a variable
max_date = sqlContext.sql("select max(rec_insert_date) from table").collect()[0][0] # max_date has actual date assigned to it
incremetal_data = sqlConext.sql(s"select count(1) from table2 where rec_insert_date > '{}'".format(max_date))
Another (and better) option is to use Dataframe API
from pyspark.sql.functions import col, lit
incremental_data = sqlContext.table("table2").filter(col("rec_insert_date") > lit(max_date))
Use cross join - it should be avoided if you have more than 1 result from the first query. The advantage is that you don't break the graph of processing, so everything can be optimized by Spark.
max_date_df = sqlContext.sql("select max(rec_insert_date) as max_date from table") # max_date_df is a dataframe with just one row
incremental_data = sqlContext.table("table2").join(max_date_df).filter(col("rec_insert_date") > col("max_date"))
As for you first question how to call large hql files from Spark:
- If you're using Spark 1.6 then you need to create a HiveContext https://spark.apache.org/docs/1.6.1/sql-programming-guide.html#hive-tables
- If you're using Spark 2.x then while creating SparkSession you need to enable Hive Support https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables
You can start by inserting im in a sqlContext.sql(...)
method, from my experience this usually works and is a nice starting point to rewrite the logic to DataFrames/Datasets API. There may be some issues while running it in your cluster because your queries will be executed by Spark's SQL engine (Catalyst) and won't be passed to Hive.