Avro specific vs generic record types - which is best or can I convert between?
Asked Answered
S

2

10

We’re trying to decide between providing generic vs specific record formats for consumption by our clients with an eye to providing an online schema registry clients can access when the schemas are updated. We expect to send out serialized blobs prefixed with a few bytes denoting the version number so schema retrieval from our registry can be automated.

Now, we’ve come across code examples illustrating the relative adaptability of the generic format for schema changes but we’re reluctant to give up the type safety and ease-of-use provided by the specific format.

Is there a way to obtain the best of both worlds? I.e. could we work with and manipulate the specific generated classes internally and then have them converted them to generic records automatically just before serialization?
Clients would then deserialize the generic records (after looking up the schema).

Also, could clients convert these generic records they received to specific ones at a later time? Some small code examples would be helpful!

Or are we looking at this all the wrong way?

Shortening answered 23/11, 2015 at 22:37 Comment(2)
While you can, I'm not sure why you would want to convert specific records to generic ones. They support everything generic records do: serialization, schema evolution by appropriately choosing namespaces and aliases, etc. Could you clarify your use-case? Would your clients be able to generate the specific record classes?Aversion
Our clients could generate specific record classes but the thinking is that using generic records would provide a type of schema change cushion. New fields could be ignored for awhile until their developers got around to generating the new classes. This example shows generic records handling this.Shortening
M
3

What you are looking for is Confluent Schema registry service and libs which helps to integrate with this.

Providing a sample to write Serialize De-serialize avro data with a evolving schema. Please note providing sample from Kafka.

import io.confluent.kafka.serializers.KafkaAvroDeserializer;  
import io.confluent.kafka.serializers.KafkaAvroSerializer; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.commons.codec.DecoderException; 
import org.apache.commons.codec.binary.Hex;

import java.util.HashMap; import java.util.Map;

public class ConfluentSchemaService {

    public static final String TOPIC = "DUMMYTOPIC";

    private KafkaAvroSerializer avroSerializer;
    private KafkaAvroDeserializer avroDeserializer;

    public ConfluentSchemaService(String conFluentSchemaRigistryURL) {

        //PropertiesMap
        Map<String, String> propMap = new HashMap<>();
        propMap.put("schema.registry.url", conFluentSchemaRigistryURL);
        // Output afterDeserialize should be a specific Record and not Generic Record
        propMap.put("specific.avro.reader", "true");

        avroSerializer = new KafkaAvroSerializer();
        avroSerializer.configure(propMap, true);

        avroDeserializer = new KafkaAvroDeserializer();
        avroDeserializer.configure(propMap, true);
    }

    public String hexBytesToString(byte[] inputBytes) {
        return Hex.encodeHexString(inputBytes);
    }

    public byte[] hexStringToBytes(String hexEncodedString) throws DecoderException {
        return Hex.decodeHex(hexEncodedString.toCharArray());
    }

    public byte[] serializeAvroPOJOToBytes(GenericRecord avroRecord) {
        return avroSerializer.serialize(TOPIC, avroRecord);
    }

    public Object deserializeBytesToAvroPOJO(byte[] avroBytearray) {
        return avroDeserializer.deserialize(TOPIC, avroBytearray);
    } }

Following classes have all the code you are looking for. io.confluent.kafka.serializers.KafkaAvroDeserializer;
io.confluent.kafka.serializers.KafkaAvroSerializer;

Please follow the link for more details :

http://bytepadding.com/big-data/spark/avro/avro-serialization-de-serialization-using-confluent-schema-registry/

Mosora answered 6/11, 2017 at 11:40 Comment(0)
W
2

Can I convert between them?

I wrote the following kotlin code to convert from a SpecificRecord to GenericRecord and back - via JSON.

PositionReport is an object generated off of avro with the avro plugin for gradle - it is:


@org.apache.avro.specific.AvroGenerated
public class PositionReport extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
...

The functions used are below


 /**
     * Encodes a record in AVRO Compatible JSON, meaning union types
     * are wrapped. For prettier JSON just use the Object Mapper
     * @param pos PositionReport
     * @return String
     */
    private fun PositionReport.toAvroJson() : String {
        val writer = SpecificDatumWriter(PositionReport::class.java)
        val baos = ByteArrayOutputStream()

        val jsonEncoder = EncoderFactory.get().jsonEncoder(this.schema, baos)
        writer.write(this, jsonEncoder)
        jsonEncoder.flush()
        return baos.toString("UTF-8")
    }

    /**
     * Converts from Genreic Record into JSON - Seems smarter, however,
     * to unify this function and the one above but whatevs
     * @param record GenericRecord
     * @param schema Schema
     */
    private fun GenericRecord.toAvroJson(): String {
        val writer = GenericDatumWriter<Any>(this.schema)
        val baos = ByteArrayOutputStream()

        val jsonEncoder = EncoderFactory.get().jsonEncoder(this.schema, baos)
        writer.write(this, jsonEncoder)
        jsonEncoder.flush()
        return baos.toString("UTF-8")
    }

    /**
     * Takes a Generic Record of a position report and hopefully turns
     * it into a position report... maybe it will work
     * @param gen GenericRecord
     * @return PositionReport
     */
    private fun toPosition(gen: GenericRecord) : PositionReport {

        if (gen.schema != PositionReport.getClassSchema()) {
            throw Exception("Cannot convert GenericRecord to PositionReport as the Schemas do not match")
        }

        // We will convert into JSON - and use that to then convert back to the SpecificRecord
        // Probalby there is a better way
        val json = gen.toAvroJson()

        val reader: DatumReader<PositionReport> = SpecificDatumReader(PositionReport::class.java)
        val decoder: Decoder = DecoderFactory.get().jsonDecoder(PositionReport.getClassSchema(), json)
        val pos = reader.read(null, decoder)
        return pos
    }

    /**
     * Converts a Specific Record to a Generic Record (I think)
     * @param pos PositionReport
     * @return GenericData.Record
     */
    private fun toGenericRecord(pos: PositionReport): GenericData.Record {
        val json = pos.toAvroJson()

        val reader : DatumReader<GenericData.Record> = GenericDatumReader(pos.schema)
        val decoder: Decoder = DecoderFactory.get().jsonDecoder(pos.schema, json)
        val datum = reader.read(null, decoder)
        return datum
    }

There are a couple difference however between the two:

  • Fields in the SpecificRecord that are of Instant type will be encoded in the GenericRecord as long and Enums are slightly different

So for example in my unit test of this function time fields are tested like this:

val gen = toGenericRecord(basePosition)
assertEquals(basePosition.getIgtd().toEpochMilli(), gen.get("igtd"))

And enums are validated by string

val gen = toGenericRecord(basePosition)
assertEquals(basePosition.getSource().toString(), gen.get("source").toString())

So to convert between you can do:


val gen = toGenericRecord(basePosition)
val newPos = toPosition(gen)

assertEquals(newPos, basePosition)

Wafture answered 7/1, 2021 at 23:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.