How to create hive table from Spark data frame, using its schema?
Asked Answered
I

6

18

I want to create a hive table using my Spark dataframe's schema. How can I do that?

For fixed columns, I can use:

val CreateTable_query = "Create Table my table(a string, b string, c double)"
sparksession.sql(CreateTable_query) 

But I have many columns in my dataframe, so is there a way to automatically generate such query?

Indore answered 15/2, 2017 at 22:58 Comment(2)
Possible duplicate of Hadoop Hive unable to move source to destinationClumsy
Create HiveContext and then run : val CreateTable_query = hiveContext.sql("Create Table myTable as select * from mytempTable") This will solve your issueAlodee
W
28

Assuming, you are using Spark 2.1.0 or later and my_DF is your dataframe,

//get the schema split as string with comma-separated field-datatype pairs
StructType my_schema = my_DF.schema();
String columns = Arrays.stream(my_schema.fields())
                       .map(field -> field.name()+" "+field.dataType().typeName())
                       .collect(Collectors.joining(","));

//drop the table if already created
spark.sql("drop table if exists my_table");
//create the table using the dataframe schema
spark.sql("create table my_table(" + columns + ") 
    row format delimited fields terminated by '|' location '/my/hdfs/location'");
    //write the dataframe data to the hdfs location for the created Hive table
    my_DF.write()
    .format("com.databricks.spark.csv")
    .option("delimiter","|")
    .mode("overwrite")
    .save("/my/hdfs/location");

The other method using temp table

my_DF.createOrReplaceTempView("my_temp_table");
spark.sql("drop table if exists my_table");
spark.sql("create table my_table as select * from my_temp_table");
Weiss answered 11/7, 2017 at 12:41 Comment(4)
why do we need to create temp tables? is there any benefit over my_DF.write.saveAsTable(...)?Malinin
#30664508 TL;DR saveastable doesn't create a hive compatible table. Question asks for hive table specifically so...Group
i would change field.dataType().typeName() to field.dataType().sql() it handles complex/array types betterGleeson
Scala translation val tableColumns = df.schema.filter(_.name != partCol).map(field => field.name + " " + field.dataType.typeName).mkString(",")Madlin
G
10

As per your question it looks like you want to create table in hive using your data-frame's schema. But as you are saying you have many columns in that data-frame so there are two options

  • 1st is create direct hive table trough data-frame.
  • 2nd is take schema of this data-frame and create table in hive.

Consider this code:

package hive.example

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object checkDFSchema extends App {
  val cc = new SparkConf;
  val sc = new SparkContext(cc)
  val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate()
  //First option for creating hive table through dataframe 
  val DF = sparkSession.sql("select * from salary")
  DF.createOrReplaceTempView("tempTable")
  sparkSession.sql("Create table yourtable as select * form tempTable")
  //Second option for creating hive table from schema
  val oldDFF = sparkSession.sql("select * from salary")
  //Generate the schema out of dataframe  
  val schema = oldDFF.schema
  //Generate RDD of you data 
  val rowRDD = sc.parallelize(Seq(Row(100, "a", 123)))
  //Creating new DF from data and schema 
  val newDFwithSchema = sparkSession.createDataFrame(rowRDD, schema)
  newDFwithSchema.createOrReplaceTempView("tempTable")
  sparkSession.sql("create table FinalTable AS select * from tempTable")
}
Gestapo answered 31/10, 2017 at 7:37 Comment(3)
temp view? This appears to be creating a temporary table - not in hive .. ? Please show that the table were actually created in hive - e.g. in which hive databaseEvenfall
This "Create table yourtable as select * from tempTable" command will create table in hive with "yourtable" as table name in hive db.. here i haven't mentioned any db name so its will create in default database.Gestapo
I had done some additional research: and it seems your approach should be correct. The reason for my skepticism is: it is not working for me. I will have to create a separate question about how to intermix in-memory (temp) and hive tablesEvenfall
A
9

Another way is to use methods available on StructType.. sql , simpleString, TreeString etc...

You can create DDLs from a Dataframe's schema, Can create Dataframe's schema from your DDLs ..

Here is one example - ( Till Spark 2.3)

    // Setup Sample Test Table to create Dataframe from
    spark.sql(""" drop database hive_test cascade""")
    spark.sql(""" create database hive_test""")
    spark.sql("use hive_test")
    spark.sql("""CREATE TABLE hive_test.department(
    department_id int ,
    department_name string
    )    
    """)
    spark.sql("""
    INSERT INTO hive_test.department values ("101","Oncology")    
    """)

    spark.sql("SELECT * FROM hive_test.department").show()

/***************************************************************/

Now I have Dataframe to play with. in real cases you'd use Dataframe Readers to create dataframe from files/databases. Let's use it's schema to create DDLs

  // Create DDL from Spark Dataframe Schema using simpleString function

 // Regex to remove unwanted characters    
    val sqlrgx = """(struct<)|(>)|(:)""".r
 // Create DDL sql string and remove unwanted characters

    val sqlString = sqlrgx.replaceAllIn(spark.table("hive_test.department").schema.simpleString, " ")

// Create Table with sqlString
   spark.sql(s"create table hive_test.department2( $sqlString )")

Spark 2.4 Onwards you can use fromDDL & toDDL methods on StructType -

val fddl = """
      department_id int ,
      department_name string,
      business_unit string
      """


    // Easily create StructType from DDL String using fromDDL
    val schema3: StructType = org.apache.spark.sql.types.StructType.fromDDL(fddl)


    // Create DDL String from StructType using toDDL
    val tddl = schema3.toDDL

    spark.sql(s"drop table if exists hive_test.department2 purge")

   // Create Table using string tddl
    spark.sql(s"""create table hive_test.department2 ( $tddl )""")

    // Test by inserting sample rows and selecting
    spark.sql("""
    INSERT INTO hive_test.department2 values ("101","Oncology","MDACC Texas")    
    """)
    spark.table("hive_test.department2").show()
    spark.sql(s"drop table hive_test.department2")

Aggie answered 1/7, 2019 at 9:45 Comment(1)
I get reflection and NullPointerExceptions when I try .toDDL. In general, it seems I can't get the DDLs at compile time and there seems to be some kind of interaction with spark implicits or session. I want to be output hive statements that I can run separately (for external partitioned tables) and there doesn't seem to be a way to do that.Coda
K
5

From spark 2.4 onwards you can use the function to get the column names and types (even for nested struct)

val df = spark.read....

df.schema.toDDL
Kumasi answered 3/10, 2019 at 1:54 Comment(1)
I can't find this in pyspark - is it Scala only?Ringo
K
4

Here is PySpark version to create Hive table from parquet file. You may have generated Parquet files using inferred schema and now want to push definition to Hive metastore. You can also push definition to the system like AWS Glue or AWS Athena and not just to Hive metastore. Here I am using spark.sql to push/create permanent table.

 # Location where my parquet files are present.
 df = spark.read.parquet("s3://my-location/data/")

    cols = df.dtypes
    buf = []
    buf.append('CREATE EXTERNAL TABLE test123 (')
    keyanddatatypes =  df.dtypes
    sizeof = len(df.dtypes)
    print ("size----------",sizeof)
    count=1;
    for eachvalue in keyanddatatypes:
        print count,sizeof,eachvalue
        if count == sizeof:
            total = str(eachvalue[0])+str(' ')+str(eachvalue[1])
        else:
            total = str(eachvalue[0]) + str(' ') + str(eachvalue[1]) + str(',')
        buf.append(total)
        count = count + 1

    buf.append(' )')
    buf.append(' STORED as parquet ')
    buf.append("LOCATION")
    buf.append("'")
    buf.append('s3://my-location/data/')
    buf.append("'")
    buf.append("'")
    ##partition by pt
    tabledef = ''.join(buf)

    print "---------print definition ---------"
    print tabledef
    ## create a table using spark.sql. Assuming you are using spark 2.1+
    spark.sql(tabledef);
Kettering answered 22/4, 2018 at 6:1 Comment(0)
A
0

Try below approach in your spark shell using spark data frames.

/* Read data from any file */

val df=spark.read.parquet("/test/sample/data/")

/* Check schema */

val schema1 = df.schema

/* getting columns and data type */

val columns = schema1.fields.map(field => s"${field.name} ${field.dataType.typeName}").mkString(",")

/* Create hive table */

spark.sql("CREATE TABLE sample_table($columns)
stored as parquet
location '/test/sample/parsed/data'")
Ancestor answered 3/5 at 17:52 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.