Kafka Streams - SerializationException: Unknown magic byte
Asked Answered
E

1

16

I am trying to create a Kafka Streams Application which processes Avro records, but I am getting the following error:

Exception in thread "streams-application-c8031218-8de9-4d55-a5d0-81c30051a829-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:567)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:900)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:801)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

I am not sure what is causing this error. I am just trying to get Avro records into the application first where they then will be processed and then output to another topic but it doesn't not seem to be working. I have included the code from the application below. Can anyone see why it is not working?

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    Serde<String> stringSerde = Serdes.String();
    Serde<trackingReport> specificAvroTrackingReportSerde = new SpecificAvroSerde<trackingReport>();

    specificAvroTrackingReportSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);


    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, trackingReport> inputreports = builder.stream("intesttopic", Consumed.with(stringSerde, specificAvroTrackingReportSerde));


    KStream<String, trackingReport> outputreports = inputreports;

    String outputTopic = "outtesttopic";
    outputreports.to(outputTopic, Produced.with(stringSerde, specificAvroTrackingReportSerde));

    Topology topology = builder.build();

    KafkaStreams streams = new KafkaStreams(topology, props);
    streams.start();
Enabling answered 18/12, 2018 at 14:42 Comment(0)
N
17

Unknown magic byte!

Means your data does not adhere to the wire format that's expected for the Schema Registry.

Or, in other words, the data you're trying to read, is not Avro, as expected by the Confluent Avro deserializer.

You can expect the same error by running kafka-avro-console-consumer, by the way, so you may want to debug using that too

If you are sure your data is indeed Avro, and the schema is actually sent as part of the message (would need to see your producer code), then you shouldn't use the Confluent Avro deserializers that are expecting a specific byte format in the message. Instead, you could use ByteArrayDesrializer and read the Avro record yourself, then pass it to the Apache Avro BinaryDecoder class. As a bonus, you can extract that logic into your own Deserialzer class

Also, if the input topic is Avro, I don't think you should be using this property for reading strings

DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
Nemato answered 18/12, 2018 at 14:53 Comment(11)
Yeah I just checked running the command and it did not work either. My producer is the same as the one at: #53782139Enabling
Yeah I understand that property but thought it was okay to override as I have done with Consumed.withEnabling
intesttopic is not the same topic as the one being sent to in the previous postNemato
By the way, outputreports is an unnecessary variable. There's no need to copy the KStream variable to a new nameNemato
If I am not using the Confluent Avro deserializer should I be creating a custom oneEnabling
Your deserializer needs to invert whatever serializer you used in the producer. In Kafka Streams, you have a Serde class that combines the two... I'm not sure if that answers your questionNemato
Re "Or if it is, and the schema is sent as part of the message (would need to see your producer code), then you shouldn't use the Confluent Avro deserializers" - what Deserializer should you use in this case? We're trying to read Avro messages from Oracle Golden Gate and getting the same error when attempting to deserialize with KafkaAvroDeserializerKenji
@Kevin What serializers / converters did you configure GoldenGate with? If you used the Confluent ones, then those work with KafkaAvroDeserializer. If you used JSON converters, then you wouldn't use Avro. Also make sure you check both the key and value of the converter settingsNemato
@cricket_007 KafkaAvroSerializer/KafkaAvroDeserializer with messages sent with GG Kafka Handler get the Unknown Magic Byte error, but the same KafkaAvroSerializer/KafkaAvroDeserializer for messages sent by GG using Kafka Connect Handler work as expected. Not sure if we had Kafka Handler misconfigured, but Kafka Connect Handler is working for usKenji
@Kevin , I'm actually not familiar with GoldenGate, but happy to look into it, if you create a full post wiith the issue rather than just a comment. The "Kafka Connect Handler" would use AvroConverter, which wraps those seializers you mentioned. Confluent only offers one combo of those classes, so I'm not sure what package names of those classes you're refering toNemato
If you test deserialisation using the kafka-avro-console-consumer, be sure to add "--property print.key=true" The problem may be that the record key is not valid avro, even if the value is.Metalwork

© 2022 - 2024 — McMap. All rights reserved.