I have an RDD of type Row i.e, RDD[Row] and avro schema object .I need to create a dataframe with this info.
I need toconvert avro schema object into StructType for creating DataFrame.
Can you please help .
I have an RDD of type Row i.e, RDD[Row] and avro schema object .I need to create a dataframe with this info.
I need toconvert avro schema object into StructType for creating DataFrame.
Can you please help .
com.databricks.spark.avro has a class to help you with this
StructType requiredType = (StructType) SchemaConverters.toSqlType(AvroClass.getClassSchema()).dataType();
In pyspark 2.4.7 my solusion is to create an empty dataframe with avroschema and then take the the StructType object from this empty dataframe.
with open('/path/to/some.avsc','r') as avro_file:
avro_scheme = avro_file.read()
df = spark\
.read\
.format("avro")\
.option("avroSchema", avro_scheme)\
.load()
struct_type = df.schema
The answer from Wisnia works, but FYI another solution my coworkers and I came up with was the following:
avro_schema = "..."
java_schema_type = spark._jvm.org.apache.spark.sql.avro.SchemaConverters.toSqlType(
spark._jvm.org.apache.avro.Schema.Parser().parse(avro_schema)
)
java_struct_schema = java_schema_type.dataType()
struct_json_schema = java_struct_schema.json()
json_schema_obj = json.loads(struct_json_schema)
schema = StructType.fromJson(json_schema_obj)
Updated as of 2020-05-31
Use below if you're on scala 2.12
with a newer spark version.
sbt:
scalaVersion := "2.12.11"
val sparkVersion = "2.4.5"
libraryDependencies += "org.apache.spark" %% "spark-avro" % sparkVersion
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types.StructType
val schemaType = SchemaConverters
.toSqlType(avroSchema)
.dataType
.asInstanceOf[StructType]
Any example for doing same in pyspark? Below code works for me but there should be some other easier way to do this
# pyspark --packages org.apache.spark:spark-avro_2.11:2.4.4
import requests
import os
import avro.schema
from pyspark.sql.types import StructType
schema_registry_url = 'https://schema-registry.net/subjects/subject_name/versions/latest/schema'
schema_requests = requests.get(url=schema_registry_url)
spark_type = sc._jvm.org.apache.spark.sql.avro.SchemaConverters.toSqlType(sc._jvm.org.apache.avro.Schema.Parser().parse(schema_requests.text))
Databrics gives support to avro related utilities in spark-avro package, use below dependency in sbt "com.databricks" % "spark-avro_2.11" % "3.2.0"
Code
*
val sqlSchema= SchemaConverters.toSqlType(avroSchema)
*
Before '3.2.0' version, 'toSqlType' is private method so if you are using older version than 3.2 then copy complete method in your own util class else upgrade to latest version.
© 2022 - 2024 — McMap. All rights reserved.