Can we able to use mulitple sparksessions to access two different Hive servers
Asked Answered
A

3

7

I have a scenario to compare two different tables source and destination from two separate remote hive servers, can we able to use two SparkSessions something like I tried below:-

 val spark = SparkSession.builder().master("local")
  .appName("spark remote")
  .config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.160:3306/metastore?useSSL=false")
  .config("javax.jdo.option.ConnectionUserName", "hiveroot")
  .config("javax.jdo.option.ConnectionPassword", "hivepassword")
  .config("hive.exec.scratchdir", "/tmp/hive/${user.name}")
  .config("hive.metastore.uris", "thrift://192.168.175.160:9083")
  .enableHiveSupport()
  .getOrCreate()

SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()

val sparkdestination = SparkSession.builder()
  .config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.42:3306/metastore?useSSL=false")
  .config("javax.jdo.option.ConnectionUserName", "hiveroot")
  .config("javax.jdo.option.ConnectionPassword", "hivepassword")
  .config("hive.exec.scratchdir", "/tmp/hive/${user.name}")
  .config("hive.metastore.uris", "thrift://192.168.175.42:9083")
  .enableHiveSupport()
  .getOrCreate() 

I tried with SparkSession.clearActiveSession() and SparkSession.clearDefaultSession() but it isn't working, throwing the error below:

Hive: Failed to access metastore. This class should not accessed in runtime.
org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient

is there any other way we can achieve accessing two hive tables using multiple SparkSessions or SparkContext.

Thanks

Altimetry answered 6/7, 2017 at 12:43 Comment(0)
I
2

Look at SparkSession getOrCreate method

which state that

gets an existing [[SparkSession]] or, if there is no existing one, creates a new one based on the options set in this builder.

This method first checks whether there is a valid thread-local SparkSession, and if yes, return that one. It then checks whether there is a valid global default SparkSession, and if yes, return that one. If no valid global default SparkSession exists, the method creates a new SparkSession and assigns the newly created SparkSession as the global default. In case an existing SparkSession is returned, the config options specified in this builder will be applied to the existing SparkSession.

That's the reason its returning first session and its configurations.

Please go through the docs to find out alternative ways to create session..


I'm working on <2 spark version. So I am not sure how to create new session with out collision of configuration exactly..

But, here is useful test case i.e SparkSessionBuilderSuite.scala to do that- DIY..

Example method in that test case

test("use session from active thread session and propagate config options") {
    val defaultSession = SparkSession.builder().getOrCreate()
    val activeSession = defaultSession.newSession()
    SparkSession.setActiveSession(activeSession)
    val session = SparkSession.builder().config("spark-config2", "a").getOrCreate()

    assert(activeSession != defaultSession)
    assert(session == activeSession)
    assert(session.conf.get("spark-config2") == "a")
    assert(session.sessionState.conf == SQLConf.get)
    assert(SQLConf.get.getConfString("spark-config2") == "a")
    SparkSession.clearActiveSession()

    assert(SparkSession.builder().getOrCreate() == defaultSession)
    SparkSession.clearDefaultSession()
  }
Isolative answered 6/7, 2017 at 13:21 Comment(2)
I cant able to find newSession() method in SparkSession. Is there any examples Please.Altimetry
I did research on code which builds sparksession and gave above pointer. you have to check documentation methods. In fact, I am working on <2 version of spark. please check methods like setActiveSession etc...Isolative
R
4

I use this way and working perfectly fine with Spark 2.1

val sc = SparkSession.builder()
             .config("hive.metastore.uris", "thrift://dbsyz1111:10000")
             .enableHiveSupport()
             .getOrCreate()

// Createdataframe 1 from by reading the data from hive table of metstore 1
val dataframe_1 = sc.sql("select * from <SourcetbaleofMetaStore_1>")

// Resetting the existing Spark Contexts
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()

//Initialize Spark session2 with Hive Metastore 2
val spc2 = SparkSession.builder()
               .config("hive.metastore.uris", "thrift://dbsyz2222:10004")
               .enableHiveSupport()
               .getOrCreate()

// Load dataframe 2 of spark context 1 into a new dataframe of spark context2, By getting schema and data by converting to rdd  API
val dataframe_2 = spc2.createDataFrame(dataframe_1.rdd, dataframe_1.schema)

dataframe_2.write.mode("Append").saveAsTable(<targettableNameofMetastore_2>)
Roadability answered 11/7, 2017 at 19:30 Comment(2)
how the above logic will work in spark 1.6.0, the same thrift will work on the hiveContextBuie
Is this really working, i tried the exact same scenario and i cannot see any change in metastore connection when initialising spc2Epsomite
I
2

Look at SparkSession getOrCreate method

which state that

gets an existing [[SparkSession]] or, if there is no existing one, creates a new one based on the options set in this builder.

This method first checks whether there is a valid thread-local SparkSession, and if yes, return that one. It then checks whether there is a valid global default SparkSession, and if yes, return that one. If no valid global default SparkSession exists, the method creates a new SparkSession and assigns the newly created SparkSession as the global default. In case an existing SparkSession is returned, the config options specified in this builder will be applied to the existing SparkSession.

That's the reason its returning first session and its configurations.

Please go through the docs to find out alternative ways to create session..


I'm working on <2 spark version. So I am not sure how to create new session with out collision of configuration exactly..

But, here is useful test case i.e SparkSessionBuilderSuite.scala to do that- DIY..

Example method in that test case

test("use session from active thread session and propagate config options") {
    val defaultSession = SparkSession.builder().getOrCreate()
    val activeSession = defaultSession.newSession()
    SparkSession.setActiveSession(activeSession)
    val session = SparkSession.builder().config("spark-config2", "a").getOrCreate()

    assert(activeSession != defaultSession)
    assert(session == activeSession)
    assert(session.conf.get("spark-config2") == "a")
    assert(session.sessionState.conf == SQLConf.get)
    assert(SQLConf.get.getConfString("spark-config2") == "a")
    SparkSession.clearActiveSession()

    assert(SparkSession.builder().getOrCreate() == defaultSession)
    SparkSession.clearDefaultSession()
  }
Isolative answered 6/7, 2017 at 13:21 Comment(2)
I cant able to find newSession() method in SparkSession. Is there any examples Please.Altimetry
I did research on code which builds sparksession and gave above pointer. you have to check documentation methods. In fact, I am working on <2 version of spark. please check methods like setActiveSession etc...Isolative
L
0

I use spark3.2, I add something to Srinivas Bandaru.

val spark = SparkSession.builder()
      .master("local[2]")
      .config("hive.metastore.uris", "thrift://localhost:9083")
      .enableHiveSupport()
      .getOrCreate()
    spark.sql("show databases").show(false)

// here is the point.
    SQLConf.registerStaticConfigKey("hive.metastore.uris")
//    or this can also work.
//    spark.sparkContext.stop()

    SparkSession.clearDefaultSession()
    SparkSession.clearActiveSession()
    val spark2 = SparkSession.builder()
      .master("local[2]")
      .config("hive.metastore.uris", "thrift://localhost:9084")
      .enableHiveSupport()
      .getOrCreate()
    spark2.sql("show databases").show(false)

    println(spark)
    println(spark2)

The result is

+---------+
|default  |
|test     |
+---------+

org.apache.spark.sql.SparkSession@3a9c11fb
org.apache.spark.sql.SparkSession@416c8bb5
+---------+
|namespace|
+---------+
|default  |
+---------+

In sort, we create a new sparksession, but use the old sparkcontext. When create new externalCatalog, it still use the old hive uri.

// in org.apache.spark.sql.internal.SharedState
lazy val externalCatalog: ExternalCatalogWithListener = {

When we create a new sparksession, only "StaticConfig" can set to new sparkconf.

// in org.apache.spark.sql.internal.SharedState
initialConfigs.foreach {
      // We have resolved the warehouse path and should not set warehouse conf here.
      case (k, _) if k == WAREHOUSE_PATH.key || k == SharedState.HIVE_WAREHOUSE_CONF_NAME =>
      case (k, v) if SQLConf.isStaticConfigKey(k) =>
        logDebug(s"Applying static initial session options to SparkConf: $k -> $v")
        confClone.set(k, v)
      case (k, v) =>
        logDebug(s"Applying other initial session options to HadoopConf: $k -> $v")
        hadoopConfClone.set(k, v)
    }

So we have two ways to change the hive uri. One is closing the old sparkcontext and creating a new one. The other is set "hive.metastore.uris" as "staticConfKeys".

By the way, if you use some cloud product, such as AWS or Aliyun. "spark.hive.imetastoreclient.factory.class": "org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClientFactory" This config is useful when switch Hive Metastore from cloud version to community version

Lorrianelorrie answered 24/5 at 7:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.