Avro Schema to spark StructType
Asked Answered
E

3

11

This is effectively the same as my previous question, but using Avro rather than JSON as the data format.

I'm working with a Spark dataframe which could be loading data from one of a few different schema versions:

// Version One
{"namespace": "com.example.avro",
 "type": "record",
 "name": "MeObject",
 "fields": [
     {"name": "A", "type": ["null", "int"], "default": null}
 ]
}

// Version Two
{"namespace": "com.example.avro",
 "type": "record",
 "name": "MeObject",
 "fields": [
     {"name": "A", "type": ["null", "int"], "default": null},
     {"name": "B", "type": ["null", "int"], "default": null}
 ]
}

I'm using Spark Avro to load the data.

DataFrame df = context.read()
  .format("com.databricks.spark.avro")
  .load("path/to/avro/file");

which may be a Version One file or Version Two file. However I'd like to be able to process it in an identical manner, with the unknown values set to "null". The recommendation in my previous question was to set the schema, however I do not want to repeat myself writing the schema in both a .avro file and as sparks StructType and friends. How can I convert the avro schema (either text file or the generated MeObject.getClassSchema()) into sparks StructType?

Spark Avro has a SchemaConverters, but it is all private and returns some strange internal object.

Ephram answered 24/11, 2015 at 16:52 Comment(2)
Down voter, can you please explain why the down vote?Ephram
Please see my answer to #48828567Recitativo
P
8

Disclaimer: It's kind of a dirty hack. It depends on a few things:

  • Python provides a lightweight Avro processing library and due to its dynamism it doesn't require typed writers
  • an empty Avro file is still a valid document
  • Spark schema can be converted to and from JSON

Following code reads an Avro schema file, creates an empty Avro file with given schema, reads it using spark-csv and outputs Spark schema as a JSON file.

import argparse
import tempfile

import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter

from pyspark import SparkContext
from pyspark.sql import SQLContext

def parse_schema(schema):
    with open(schema) as fr:
        return avro.schema.parse(open(schema).read())

def write_dummy(schema):
    tmp = tempfile.mktemp(suffix='.avro')
    with open(tmp, "w") as fw:
        writer = DataFileWriter(fw, DatumWriter(), schema)
        writer.close()
    return tmp

def write_spark_schema(path, schema):
    with open(path, 'w') as fw:
        fw.write(schema.json())


def main():
    parser = argparse.ArgumentParser(description='Avro schema converter')
    parser.add_argument('--schema')
    parser.add_argument('--output')
    args = parser.parse_args()

    sc = SparkContext('local[1]', 'Avro schema converter')
    sqlContext = SQLContext(sc)

    df = (sqlContext.read.format('com.databricks.spark.avro')
            .load(write_dummy(parse_schema(args.schema))))

    write_spark_schema(args.output, df.schema)
    sc.stop()


if __name__ == '__main__':
    main()

Usage:

bin/spark-submit --packages com.databricks:spark-avro_2.10:2.0.1 \ 
   avro_to_spark_schema.py \
   --schema path_to_avro_schema.avsc \
   --output path_to_spark_schema.json

Read schema:

import scala.io.Source
import org.apache.spark.sql.types.{DataType, StructType}

val json: String = Source.fromFile("schema.json").getLines.toList.head
val schema: StructType = DataType.fromJson(json).asInstanceOf[StructType]
Pompous answered 24/11, 2015 at 23:1 Comment(0)
H
3

pls see if this helps, although little late. I was trying this hard for my current work. I have used schemaconverter from Databricks. I suppose, you were trying to read the avro file with the given schema.

 val schemaObj = new Schema.Parser().parse(new File(avscfilepath));
 var sparkSchema : StructType = new StructType
 import scala.collection.JavaConversions._     
 for(field <- schemaObj.getFields()){
  sparkSchema = sparkSchema.add(field.name, SchemaConverters.toSqlType(field.schema).dataType)
 }
 sparkSchema
Herat answered 30/7, 2016 at 21:5 Comment(0)
P
1

Using PySpark:

with open('path/to/avro/file','r') as avro_file:
        avro_scheme = avro_file.read()
    
    df = spark\
        .read\
        .format("avro")\
        .option("avroSchema", avro_scheme)\
        .load()
    
    df.schema
Photomap answered 31/1, 2023 at 16:31 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.