How to connect to remote hive server from spark [duplicate]
Asked Answered
R

3

17

I'm running spark locally and want to to access Hive tables, which are located in the remote Hadoop cluster.

I'm able to access the hive tables by lauching beeline under SPARK_HOME

[ml@master spark-2.0.0]$./bin/beeline 
Beeline version 1.2.1.spark2 by Apache Hive
beeline> !connect jdbc:hive2://remote_hive:10000
Connecting to jdbc:hive2://remote_hive:10000
Enter username for jdbc:hive2://remote_hive:10000: root
Enter password for jdbc:hive2://remote_hive:10000: ******
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/ml/spark/spark-2.0.0/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
16/10/12 19:06:39 INFO jdbc.Utils: Supplied authorities: remote_hive:10000
16/10/12 19:06:39 INFO jdbc.Utils: Resolved authority: remote_hive:10000
16/10/12 19:06:39 INFO jdbc.HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://remote_hive:10000
Connected to: Apache Hive (version 1.2.1000.2.4.2.0-258)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://remote_hive:10000>

how can I access the remote hive tables programmatically from spark?

Revivify answered 12/10, 2016 at 11:16 Comment(1)
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("show tables") wont work ?Staging
S
23

JDBC is not required

Spark connects directly to the Hive metastore, not through HiveServer2. To configure this,

  1. Put hive-site.xml on your classpath, and specify hive.metastore.uris to where your hive metastore hosted. Also see How to connect to a Hive metastore programmatically in SparkSQL?

  2. Import org.apache.spark.sql.hive.HiveContext, as it can perform SQL query over Hive tables.

  3. Define val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

  4. Verify sqlContext.sql("show tables") to see if it works

SparkSQL on Hive tables

Conclusion : If you must go with jdbc way

Have a look connecting apache spark with apache hive remotely.

Please note that beeline also connects through jdbc. from your log it self its evident.

[ml@master spark-2.0.0]$./bin/beeline Beeline version 1.2.1.spark2 by Apache Hive beeline> !connect jdbc:hive2://remote_hive:10000

Connecting to jdbc:hive2://remote_hive:10000

So please have a look at this interesting article

  • Method 1: Pull table into Spark using JDBC
  • Method 2: Use Spark JdbcRDD with HiveServer2 JDBC driver
  • Method 3: Fetch dataset on a client side, then create RDD manually

Currently HiveServer2 driver doesn't allow us to use "Sparkling" Method 1 and 2, we can rely only on Method 3

Below is example code snippet though which it can be achieved

Loading data from one Hadoop cluster (aka "remote") into another one (where my Spark lives aka "domestic") thru HiveServer2 JDBC connection.

import java.sql.Timestamp
import scala.collection.mutable.MutableList

case class StatsRec (
  first_name: String,
  last_name: String,
  action_dtm: Timestamp,
  size: Long,
  size_p: Long,
  size_d: Long
)

val conn: Connection = DriverManager.getConnection(url, user, password)
val res: ResultSet = conn.createStatement
                   .executeQuery("SELECT * FROM stats_201512301914")
val fetchedRes = MutableList[StatsRec]()
while(res.next()) {
  var rec = StatsRec(res.getString("first_name"), 
     res.getString("last_name"), 
     Timestamp.valueOf(res.getString("action_dtm")), 
     res.getLong("size"), 
     res.getLong("size_p"), 
     res.getLong("size_d"))
  fetchedRes += rec
}
conn.close()
val rddStatsDelta = sc.parallelize(fetchedRes)
rddStatsDelta.cache()




 // Basically we are done. To check loaded data:

println(rddStatsDelta.count)
rddStatsDelta.collect.take(10).foreach(println)
Staging answered 12/10, 2016 at 16:2 Comment(9)
Should Spark be always located on the same cluster as Hive? Is it possible to have it located on another cluster?Neb
@MichaelD: 1) may or may not be 2)its possibleStaging
@RamGhadiyaram I was trying out the same, and I have a doubt. Will this method work when the data size is large, like say if my hive table is of few hundred GBs, wouldn't it lead to a "out of memory" exception?? What should I do in such a scenario ??Vie
@Vie : I am assuming that you are talking about method 3.. if so you can use batching technique here for getting results lets say 100000 records as a list and convert that to one rdd then clear the list and again do the same for all records .... we will get rdd1 .... rddn so now you can combine these rdd by union all and make a single rdd. Since we are preparing list of 100000. and then clearing OOM wont come. if it comes then reduce the no of records further.Staging
@RamGhadiyaram Sorry, yeah I was referring to the third method. Could you please explain how to extract one continuously written hive table in batches??Vie
please see my comment above, thats what I was explainingStaging
fails for me org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the location for replica 0Waters
very useful suggestions. Would the hive versions present in remote hive server and domestic(spark server) be compatible or matching? The above suggestions seem to throw error for me. It works in beeline when we use the older hive version library present in remote hive server? Any inputs would be very helpful.Nicollenicolson
@ user1652054 i could not able to get it ... please post seperate question and is almost 3 years old postStaging
T
1

After providing the hive-ste.xml configuration to SPARK and after starting the HIVE Metastore service,

Two things need to be configured in SPARK Session while connecting to HIVE:

  1. Since Spark SQL connects to Hive metastore using thrift, we need to provide the thrift server uri while creating the Spark session.
  2. Hive Metastore warehouse which is the directory where Spark SQL persists tables. Use Property 'spark.sql.warehouse.dir' which is corresponding to 'hive.metastore.warehouse.dir' (as this is deprecated in Spark 2.0)

Something like:

    SparkSession spark=SparkSession.builder().appName("Spark_SQL_5_Save To Hive").enableHiveSupport().getOrCreate();
    spark.sparkContext().conf().set("spark.sql.warehouse.dir", "/user/hive/warehouse");
    spark.sparkContext().conf().set("hive.metastore.uris", "thrift://localhost:9083");

Hope this was helpful !!

Tireless answered 3/6, 2019 at 13:34 Comment(0)
S
0

As per documentation:

Note that the hive.metastore.warehouse.dir property in hive-site.xml is deprecated since Spark 2.0.0. Instead, use spark.sql.warehouse.dir to specify the default location of database in warehouse.

So in SparkSession you need to specify spark.sql.uris instead of hive.metastore.uris

    from pyspark.sql import SparkSession
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL Hive integration example") \
        .config("spark.sql.uris", "thrift://<remote_ip>:9083") \
        .enableHiveSupport() \
        .getOrCreate()
    spark.sql("show tables").show()
Seasick answered 4/3, 2020 at 6:7 Comment(2)
it is showing tables. But when I apply query on that table it returning empty dataframe .Abut
after setting this i am able to see records for "show tables". However on sql query to specific table, it is giving this error "No FileSystem for scheme s3" . I have set s3a for spark, if i use s3, then spark is throwing error to use S3a. Is this a deadlock?Bedford

© 2022 - 2024 — McMap. All rights reserved.