Deserialize Avro Spark
Asked Answered
G

1

6

I'm pushing a stream of data to Azure EventHub with the following code leveraging Microsoft.Hadoop.Avro.. this code runs every 5 seconds, and simply plops the same two Avro serialised items πŸ‘πŸΌ:

  var strSchema = File.ReadAllText("schema.json");
  var avroSerializer = AvroSerializer.CreateGeneric(strSchema);
  var rootSchema = avroSerializer.WriterSchema as RecordSchema;

  var itemList = new List<AvroRecord>();

  dynamic record_one = new AvroRecord(rootSchema);
  record_one.FirstName = "Some";
  record_one.LastName = "Guy";
  itemList.Add(record_one);

  dynamic record_two = new AvroRecord(rootSchema);
  record_two.FirstName = "A.";
  record_two.LastName = "Person";
  itemList.Add(record_two);

  using (var buffer = new MemoryStream())
  {
      using (var writer = AvroContainer.CreateGenericWriter(strSchema, buffer, Codec.Null))
      {
          using (var streamWriter = new SequentialWriter<object>(writer, itemList.Count))
          {
              foreach (var item in itemList)
              {
                  streamWriter.Write(item);
              }
          }
      }

      eventHubClient.SendAsync(new EventData(buffer.ToArray()));
  }

The schema used here is, again, v. simple:

{
  "type": "record",
  "name": "User",
  "namespace": "SerDes",
  "fields": [
    {
      "name": "FirstName",
      "type": "string"
    },
    {
      "name": "LastName",
      "type": "string"
    }
  ]
}

I have validated this is all good, with a simple view in Azure Stream Analytics on the portal:

Stream Analytics Screenshot

So far so good, but i cannot, for the life of me correctly deserialize this in Databricks leverage the from_avro() command under Scala..

Load (the exact same) schema as a string:

val sampleJsonSchema = dbutils.fs.head("/mnt/schemas/schema.json")

Configure EventHub

val connectionString = ConnectionStringBuilder("<CONNECTION_STRING>")
  .setEventHubName("<NAME_OF_EVENT_HUB>")
  .build

val eventHubsConf = EventHubsConf(connectionString).setStartingPosition(EventPosition.fromEndOfStream)
val eventhubs = spark.readStream.format("eventhubs").options(eventHubsConf.toMap).load()

Read the data..

// this works, and i can see the serialised data
display(eventhubs.select($"body"))

// this fails, and with an exception: org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
display(eventhubs.select(from_avro($"body", sampleJsonSchema)))

So essentially, what is going on here.. i am serialising the data with the same schema as deserializing, but something is malformed.. the documentation is incredibly sparse on this front (very very minimal on the Microsoft website).

Gynecology answered 7/11, 2019 at 16:36 Comment(0)
B
2

The issue

After additional investigation, (and mainly with the help of this article) I found what my problem was: from_avro(data: Column, jsonFormatSchema: String) expects spark schema format and not avro schema format. The documentation is not very clear on this.

Solution 1

Databricks provides a handy method from_avro(column: Column, subject: String, schemaRegistryUrl: String)) that fetches needed avro schema from kafka schema registry and automatically converts to correct format.

Unfortunately, it is not available for pure spark, nor is it possible to use it without a kafka schema registry.

Solution 2

Use schema conversion provided by spark:

// define avro deserializer
class AvroDeserializer() extends AbstractKafkaAvroDeserializer {
  override def deserialize(payload: Array[Byte]): String = {
    val genericRecord = this.deserialize(payload).asInstanceOf[GenericRecord]
    genericRecord.toString
  }
}

// create deserializer instance
val deserializer = new AvroDeserializer()

// register deserializer
spark.udf.register("deserialize_avro", (bytes: Array[Byte]) =>
  deserializer.deserialize(bytes)
)

// get avro schema from registry (but I presume that it should also work with schema read from a local file)
val registryClient = new CachedSchemaRegistryClient(kafkaSchemaRegistryUrl, 128)
val avroSchema = registryClient.getLatestSchemaMetadata(topic + "-value").getSchema
val sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))

// consume data 
df.selectExpr("deserialize_avro(value) as data")
  .select(from_json(col("data"), sparkSchema.dataType).as("data"))
  .select("data.*")
Braided answered 3/7, 2020 at 17:5 Comment(5)
so i assume by this you are actually using a schema registry? from recollection (this is quite an old question now) i don't think i had a schema registry.. this would imply you are leveraging apache kafka perhaps? i will give it another whirl though - still got the code somewhere 😁 i will also double check my spark version – Gynecology
p.s. i wrote my stuff in PySpark πŸ˜• – Gynecology
from_avro with direct support for Schema registry is for Databricks only, as I remember... in stock Spark it requires JSON schema, that you may get from registry via HTTP – Howling
Yeah, you are right, it works in a databricks notebook, but not in pure Spark :/ – Braided
Edit with more details – Braided

© 2022 - 2024 β€” McMap. All rights reserved.