I have a scenario to compare two different tables source and destination from two separate remote hive servers, can we able to use two SparkSessions
something like I tried below:-
val spark = SparkSession.builder().master("local")
.appName("spark remote")
.config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.160:3306/metastore?useSSL=false")
.config("javax.jdo.option.ConnectionUserName", "hiveroot")
.config("javax.jdo.option.ConnectionPassword", "hivepassword")
.config("hive.exec.scratchdir", "/tmp/hive/${user.name}")
.config("hive.metastore.uris", "thrift://192.168.175.160:9083")
.enableHiveSupport()
.getOrCreate()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
val sparkdestination = SparkSession.builder()
.config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.42:3306/metastore?useSSL=false")
.config("javax.jdo.option.ConnectionUserName", "hiveroot")
.config("javax.jdo.option.ConnectionPassword", "hivepassword")
.config("hive.exec.scratchdir", "/tmp/hive/${user.name}")
.config("hive.metastore.uris", "thrift://192.168.175.42:9083")
.enableHiveSupport()
.getOrCreate()
I tried with SparkSession.clearActiveSession() and SparkSession.clearDefaultSession()
but it isn't working, throwing the error below:
Hive: Failed to access metastore. This class should not accessed in runtime.
org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
is there any other way we can achieve accessing two hive tables using multiple SparkSessions
or SparkContext
.
Thanks
Look at SparkSession
getOrCreate
method
which state that
gets an existing [[SparkSession]] or, if there is no existing one,
creates a new one based on the options set in this builder.
This method first checks whether there is a valid thread-local
SparkSession, and if yes, return that one. It then checks whether
there is a valid global default SparkSession, and if yes, return
that one. If no valid global default SparkSession exists, the method
creates a new SparkSession and assigns the newly created
SparkSession as the global default.
In case an existing SparkSession is returned, the config options specified in this builder will be applied to the existing
SparkSession.
That's the reason its returning first session and its configurations.
Please go through the docs to find out alternative ways to create session..
I'm working on <2 spark version. So I am not sure how to create new session with out collision of configuration exactly..
But, here is useful test case i.e SparkSessionBuilderSuite.scala to do that-
DIY..
Example method in that test case
test("use session from active thread session and propagate config options") {
val defaultSession = SparkSession.builder().getOrCreate()
val activeSession = defaultSession.newSession()
SparkSession.setActiveSession(activeSession)
val session = SparkSession.builder().config("spark-config2", "a").getOrCreate()
assert(activeSession != defaultSession)
assert(session == activeSession)
assert(session.conf.get("spark-config2") == "a")
assert(session.sessionState.conf == SQLConf.get)
assert(SQLConf.get.getConfString("spark-config2") == "a")
SparkSession.clearActiveSession()
assert(SparkSession.builder().getOrCreate() == defaultSession)
SparkSession.clearDefaultSession()
}
I use this way and working perfectly fine with Spark 2.1
val sc = SparkSession.builder()
.config("hive.metastore.uris", "thrift://dbsyz1111:10000")
.enableHiveSupport()
.getOrCreate()
// Createdataframe 1 from by reading the data from hive table of metstore 1
val dataframe_1 = sc.sql("select * from <SourcetbaleofMetaStore_1>")
// Resetting the existing Spark Contexts
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
//Initialize Spark session2 with Hive Metastore 2
val spc2 = SparkSession.builder()
.config("hive.metastore.uris", "thrift://dbsyz2222:10004")
.enableHiveSupport()
.getOrCreate()
// Load dataframe 2 of spark context 1 into a new dataframe of spark context2, By getting schema and data by converting to rdd API
val dataframe_2 = spc2.createDataFrame(dataframe_1.rdd, dataframe_1.schema)
dataframe_2.write.mode("Append").saveAsTable(<targettableNameofMetastore_2>)