How to convert Avro Schema object into StructType in spark
Asked Answered
X

6

12

I have an RDD of type Row i.e, RDD[Row] and avro schema object .I need to create a dataframe with this info.

I need toconvert avro schema object into StructType for creating DataFrame.

Can you please help .

Xebec answered 24/11, 2016 at 14:40 Comment(1)
possible duplicate of #33899917Sarcastic
G
6

com.databricks.spark.avro has a class to help you with this

 StructType requiredType = (StructType) SchemaConverters.toSqlType(AvroClass.getClassSchema()).dataType();
Gefell answered 15/2, 2017 at 12:26 Comment(0)
V
6

In pyspark 2.4.7 my solusion is to create an empty dataframe with avroschema and then take the the StructType object from this empty dataframe.

with open('/path/to/some.avsc','r') as avro_file:
    avro_scheme = avro_file.read()

df = spark\
    .read\
    .format("avro")\
    .option("avroSchema", avro_scheme)\
    .load()

struct_type = df.schema

Vitovitoria answered 5/1, 2021 at 22:35 Comment(0)
N
5

The answer from Wisnia works, but FYI another solution my coworkers and I came up with was the following:

avro_schema = "..."

java_schema_type = spark._jvm.org.apache.spark.sql.avro.SchemaConverters.toSqlType(
    spark._jvm.org.apache.avro.Schema.Parser().parse(avro_schema)
)

java_struct_schema = java_schema_type.dataType()
struct_json_schema = java_struct_schema.json()
json_schema_obj = json.loads(struct_json_schema)
schema = StructType.fromJson(json_schema_obj)
Nellienellir answered 3/9, 2021 at 15:48 Comment(0)
C
4

Updated as of 2020-05-31

Use below if you're on scala 2.12 with a newer spark version.

sbt:

scalaVersion := "2.12.11"
val sparkVersion = "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-avro" % sparkVersion
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types.StructType

val schemaType = SchemaConverters
  .toSqlType(avroSchema)
  .dataType
  .asInstanceOf[StructType]
Cabrales answered 31/5, 2020 at 19:40 Comment(0)
R
2

Any example for doing same in pyspark? Below code works for me but there should be some other easier way to do this

# pyspark --packages org.apache.spark:spark-avro_2.11:2.4.4

import requests
import os
import avro.schema

from pyspark.sql.types import StructType

schema_registry_url = 'https://schema-registry.net/subjects/subject_name/versions/latest/schema'
schema_requests = requests.get(url=schema_registry_url)

spark_type = sc._jvm.org.apache.spark.sql.avro.SchemaConverters.toSqlType(sc._jvm.org.apache.avro.Schema.Parser().parse(schema_requests.text))
Rommel answered 18/11, 2020 at 23:46 Comment(1)
This! I scoured the interwebs and couldn't believe the pyspark API didn't include SchemaConverters. Just as I was closing in on this, bam! I find this. Bravo.Osuna
C
1

Databrics gives support to avro related utilities in spark-avro package, use below dependency in sbt "com.databricks" % "spark-avro_2.11" % "3.2.0"

Code

*

val sqlSchema= SchemaConverters.toSqlType(avroSchema)

*

Before '3.2.0' version, 'toSqlType' is private method so if you are using older version than 3.2 then copy complete method in your own util class else upgrade to latest version.

Carrelli answered 27/3, 2018 at 9:30 Comment(1)
Why doesn't anyone indicate the needed package imports when answering questions like this?Datum

© 2022 - 2024 — McMap. All rights reserved.