How to create a Schema file in Spark
Asked Answered
E

4

6

I am trying to read a Schema file (which is a text file) and apply it to my CSV file without a header. Since I already have a schema file I don't want to use InferSchema option which is an overhead.

My input schema file looks like below,

"num IntegerType","letter StringType"

I am trying the below code to create a schema file,

val schema_file = spark.read.textFile("D:\\Users\\Documents\\schemaFile.txt")
val struct_type = schema_file.flatMap(x => x.split(",")).map(b => (b.split(" ")(0).stripPrefix("\"").asInstanceOf[String],b.split(" ")(1).stripSuffix("\"").asInstanceOf[org.apache.spark.sql.types.DataType])).foreach(x=>println(x))

I am getting the error as below

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.types.DataType

- field (class: "org.apache.spark.sql.types.DataType", name: "_2") - root class: "scala.Tuple2"

and trying to use this as a schema file while using spark.read.csv like below and write it as an ORC file

  val df=spark.read
      .format("org.apache.spark.csv")
      .option("header", false)
      .option("inferSchema", true)
      .option("samplingRatio",0.01)
      .option("nullValue", "NULL")
      .option("delimiter","|")
      .schema(schema_file)
      .csv("D:\\Users\\sampleFile.txt")
      .toDF().write.format("orc").save("D:\\Users\\ORC")

Need help to convert a text file into a schema file and convert my input CSV file to ORC.

Encyclopedic answered 24/5, 2018 at 4:8 Comment(0)
W
7

To create a schema from a text file create a function to match the type and return DataType as

def getType(raw: String): DataType = {
  raw match {
    case "ByteType" => ByteType
    case "ShortType" => ShortType
    case "IntegerType" => IntegerType
    case "LongType" => LongType
    case "FloatType" => FloatType
    case "DoubleType" => DoubleType
    case "BooleanType" => BooleanType
    case "TimestampType" => TimestampType
    case _ => StringType
  }
}

Now create a schema by reading a schema file as

val schema = Source.fromFile("schema.txt").getLines().toList
  .flatMap(_.split(",")).map(_.replaceAll("\"", "").split(" "))
  .map(x => StructField(x(0), getType(x(1)), true))

Now read the csv file as

spark.read
  .option("samplingRatio", "0.01")
  .option("delimiter", "|")
  .option("nullValue", "NULL")
  .schema(StructType(schema))
  .csv("data.csv")

Hope this helps!

Webster answered 24/5, 2018 at 5:29 Comment(5)
Thank you @ShankarKiorala If, Input Schema is something like balance decimal(10,0). How can I update the code in the match function? Because decimal is common, but, the precession will differ for different fields, Something like case "decimal(10,0)" => DecimalType(10,0)Encyclopedic
@BalakrishnanRamasamy I think something like this case "decimal(10,0)" => { val decimal = raw.split("(")(1).replace(")", "").split(",") DecimalType(decimal(0).toInt, decimal(1).toInt) } should work but havent tested.Webster
Thanks, @ShankarKoirala, The main problem for me was to create a match case. Say suppose, the schema file is balance decimal(10,0) amount decimal(20,1) How will my match case look in this case?Encyclopedic
I am not sure with this one but matching the first part of datatype decimal should work.Webster
Excellent stuff.Bradford
J
7

You can create a JSON file named schema.json in the below format

{
  "fields": [
    {
      "metadata": {},
      "name": "first_fields",
      "nullable": true,
      "type": "string"
    },
    {
      "metadata": {},
      "name": "double_field",
      "nullable": true,
      "type": "double"
    }
  ],
  "type": "struct"
}

Create a struct schema from reading this file

rdd = spark.sparkContext.wholeTextFiles("s3://<bucket>/schema.json")
text = rdd.collect()[0][1]
dict = json.loads(str(text))
custom_schema = StructType.fromJson(dict)

After that, you can use struct as a schema to read csv file

val df=spark.read
      .format("org.apache.spark.csv")
      .option("header", false)
      .option("inferSchema", true)
      .option("samplingRatio",0.01)
      .option("nullValue", "NULL")
      .option("delimiter","|")
      .schema(custom_schema)
      .csv("D:\\Users\\sampleFile.txt")
      .toDF().write.format("orc").save("D:\\Users\\ORC")
Joby answered 8/10, 2020 at 10:2 Comment(1)
in this example how can we add another struct field like this in json format val schema = new StructType() .add("first_fields", StringType, true) .add("double_field", StringType, true) .add("another_struct_field",new StructType().add("banana",StringType,true),true)Tiresome
F
0

Something like this is a little bit more robust since it uses the hive metastore:

    import org.apache.hadoop.hive.metastore.api.FieldSchema
    def sparkToHiveSchema(schema: StructType): List[FieldSchema] ={
        schema.map(field => new FieldSchema(field.name,field.dataType.catalogString,field.getComment.getOrElse(""))).toList
    }
``


Feller answered 13/1, 2019 at 22:35 Comment(0)
G
0

You can specify schema like this:

import org.apache.spark.sql.types.{StructType, StructField, StringType,IntegerType}; 

For example:

val schema = new StructType(
Array(
   StructField("Age",IntegerType,true),
  StructField("Name",StringType,true),
  )
)

val data = spark.read.option("header", "false").schema(schema).csv("filename.csv")
data.show()

This would directly create it in a dataframe

Grandnephew answered 22/5, 2019 at 11:8 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.