Apache Flink read Avro byte[] from Kafka
Asked Answered
R

2

0

In reviewing examples I see alot of this:

FlinkKafkaConsumer08<Event> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);

I see that they here already know the schema.

I do not know the schema until I read the byte[] into a Generic Record then get the schema. (As it may change from record to record)

Can someone point me into a FlinkKafkaConsumer08 that reads from byte[] into a map filter so that I can remove some leading bits, then load that byte[] into a Generic Record ?

Rancho answered 21/12, 2016 at 4:18 Comment(0)
S
0

I'm doing something similar (I'm using the 09 consumer)

In your main code pass in your custom deserializer:

FlinkKafkaConsumer09<Object> kafkaConsumer = new FlinkKafkaConsumer09<>(
                parameterTool.getRequired("topic"), new MyDeserializationSchema<>(),
                parameterTool.getProperties());

The custom Deserialization Schema reads the bytes, figures out the schema and/or retrieves it from a schema registry, deserializes into a GenericRecord and returns the GenericRecord object.

public class MyDeserializationSchema<T> implements DeserializationSchema<T> {


    private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class;

    @Override
    public T deserialize(byte[] arg0) throws IOException {
        //do your stuff here, strip off your bytes
        //deserialize and create your GenericRecord 
        return (T) (myavroevent);
    }

    @Override
    public boolean isEndOfStream(T nextElement) {
        return false;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(avrotype);
    }

}
Stinkstone answered 21/12, 2016 at 21:38 Comment(1)
Wow works right out of the box. Thanks it is obvious now that I look at this.Rancho
P
1

If you use Confluent's schema registry, I believe a preferred solution would be to use the Avro serde provided by Confluent. This way, we just call deserialize() and the resolution of the latest version of the Avro schema to use is done automatically behind the scene and no byte manipulation is required.

It boils down to something like this (example code in scala, a java solution would be very similar):

import io.confluent.kafka.serializers.KafkaAvroDeserializer

...

val valueDeserializer = new KafkaAvroDeserializer()
valueDeserializer.configure(
  Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava, 
  false)

...

override def deserialize(messageKey: Array[Byte], message: Array[Byte], 
                       topic: String, partition: Int, offset: Long): KafkaKV = {

    val key = keyDeserializer.deserialize(topic, messageKey).asInstanceOf[GenericRecord]
    val value = valueDeserializer.deserialize(topic, message).asInstanceOf[GenericRecord]

    KafkaKV(key, value)
    }

...

This method requires that the message producer is also integrated with the schema registry and publishes the schema there. This can be done in a very similar way as above, using Confluent's KafkaAvroSerializer

I posted a detailed explanation here: How to integrate Flink with Confluent's schema registry

Pitzer answered 16/8, 2017 at 10:15 Comment(3)
Thanks for the heads-up, it should be fixed now.Pitzer
BTW, I think it was FLINK-9337 / 9338 that have added Confluent serializer packagesCorenecoreopsis
Hi Send, I am using the way you describe and build a datastream with the generic record. But how do I write this stream in parquet format as parquet writer requires schema and my records have different schemas that are registered in the schema registry.Eolic
S
0

I'm doing something similar (I'm using the 09 consumer)

In your main code pass in your custom deserializer:

FlinkKafkaConsumer09<Object> kafkaConsumer = new FlinkKafkaConsumer09<>(
                parameterTool.getRequired("topic"), new MyDeserializationSchema<>(),
                parameterTool.getProperties());

The custom Deserialization Schema reads the bytes, figures out the schema and/or retrieves it from a schema registry, deserializes into a GenericRecord and returns the GenericRecord object.

public class MyDeserializationSchema<T> implements DeserializationSchema<T> {


    private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class;

    @Override
    public T deserialize(byte[] arg0) throws IOException {
        //do your stuff here, strip off your bytes
        //deserialize and create your GenericRecord 
        return (T) (myavroevent);
    }

    @Override
    public boolean isEndOfStream(T nextElement) {
        return false;
    }

    @Override
    public TypeInformation<T> getProducedType() {
        return TypeExtractor.getForClass(avrotype);
    }

}
Stinkstone answered 21/12, 2016 at 21:38 Comment(1)
Wow works right out of the box. Thanks it is obvious now that I look at this.Rancho

© 2022 - 2024 — McMap. All rights reserved.