passing value of RDD to another RDD as variable -

2019-03-03 15:49发布

问题:

This question already has an answer here:

  • How to get a value from the Row object in Spark Dataframe? 3 answers

I am currently exploring how to call big hql files (contains 100 line of an insert into select statement) via sqlContext.

Another thing is, The hqls files are parameterize, so while calling it from sqlContext, I want to pass the parameters as well.

Have gone through loads of blogs and posts, but not found any answers to this.

Another thing I was trying, to store an output of rdd into a variable.

pyspark

max_date=sqlContext.sql("select max(rec_insert_date) from table")

now want to pass max_date as variable to next rdd

incremetal_data=sqlConext.sql(s"select count(1) from table2 where rec_insert_date > $max_dat")

This is not working , moreover the value for max_date is coming as =

u[row-('20018-05-19 00:00:00')]

now this is not clear how to trim those extra characters.

回答1:

The sql Context reterns a Dataset[Row]. You can get your value from there with

max_date=sqlContext.sql("select count(rec_insert_date) from table").first()[0]

In Spark 2.0+ using spark Session you can

 max_date=spark.sql("select count(rec_insert_date) from table").rdd.first()[0]

to get the underlying rdd from the returned dataframe



回答2:

Shouldn't you use max(rec_insert_date) instead of count(rec_insert_date)?

You have two options on passing values returned from one query to another:

  1. Use collect, which will trigger computations and assign returned value to a variable

    max_date = sqlContext.sql("select max(rec_insert_date) from table").collect()[0][0] # max_date has actual date assigned to it incremetal_data = sqlConext.sql(s"select count(1) from table2 where rec_insert_date > '{}'".format(max_date))

    Another (and better) option is to use Dataframe API

    from pyspark.sql.functions import col, lit incremental_data = sqlContext.table("table2").filter(col("rec_insert_date") > lit(max_date))

  2. Use cross join - it should be avoided if you have more than 1 result from the first query. The advantage is that you don't break the graph of processing, so everything can be optimized by Spark.

    max_date_df = sqlContext.sql("select max(rec_insert_date) as max_date from table") # max_date_df is a dataframe with just one row incremental_data = sqlContext.table("table2").join(max_date_df).filter(col("rec_insert_date") > col("max_date"))

As for you first question how to call large hql files from Spark:

  • If you're using Spark 1.6 then you need to create a HiveContext https://spark.apache.org/docs/1.6.1/sql-programming-guide.html#hive-tables
  • If you're using Spark 2.x then while creating SparkSession you need to enable Hive Support https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables

You can start by inserting im in a sqlContext.sql(...) method, from my experience this usually works and is a nice starting point to rewrite the logic to DataFrames/Datasets API. There may be some issues while running it in your cluster because your queries will be executed by Spark's SQL engine (Catalyst) and won't be passed to Hive.