kafka streams protobuf cast exception
Asked Answered
L

1

6

I am using Kafka streams to read and process protobuf messages.

I am using the following properties for the stream:


        Properties properties = new Properties();
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());
        properties.put(StreamsConfig.CLIENT_ID_CONFIG, kafkaConfig.getClientId());
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaConfig.getApplicationId());
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());

        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaProtobufSerde.class);
        properties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaConfig.getSchemaRegistryUrl());
        properties.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, ProtobufData.class);
        return properties;
    }

but while running I encounter this error:

Caused by: java.lang.ClassCastException: class com.google.protobuf.DynamicMessage cannot be cast to class model.schema.proto.input.ProtobufDataProto$ProtobufData (com.google.protobuf.DynamicMessage and model.schema.proto.input.ProtobufDataProto$ProtobufData are in unnamed module of loader 'app')

My .proto files looks as follows:

import "inner_data.proto";
package myPackage;

option java_package = "model.schema.proto.input";
option java_outer_classname = "ProtobufDataProto";

message OuterData {
    string timestamp = 1;
    string x = 3;
    repeated InnerObject flows = 4;
}

(I have two separate proto files)

package myPackage;

option java_package = "model.schema.proto.input";
option java_outer_classname = "InnerDataProto";

message InnerData {
  string a = 1;
  string b = 2;
  string c = 3;
}

I would like to know why Kafka uses DynamicMessage even though I gave the specific protobuf value class in the properties and how to fix this?

Lise answered 29/12, 2021 at 12:5 Comment(2)
Refer: docs.confluent.io/platform/current/schema-registry/… It says "Finally, if no type is provided or no type can be derived, the deserializer uses the schema to return an instance of a Protobuf DynamicMessage."Jamshedpur
Is the inconsistency between your .proto file message called OuterData and the java class called ProtobufData a typo? Because if that is not a typo then this might be the cause of your error, using different classes.Employee
P
2

I had the same issue while trying to make Kafkastream working with protobuf,

I solved this issue by using specifically KafkaProtobufSerde to configure the streambuilder AND by specifiing explicitly the class to deserialize to with this line: serdeConfig.put(SPECIFIC_PROTOBUF_VALUE_TYPE,ProtobufDataProto.class.getName());

    /*
     *  Define SpecificSerde for Even in protobuff
     */
    final KafkaProtobufSerde< ProtobufDataProto > protoSerde = new KafkaProtobufSerde<>();
    Map<String, String> serdeConfig = new HashMap<>();
    serdeConfig.put(SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
    /*
     * Technically, the following line is only mandatory in order to de-serialize object into GeneratedMessageV3
     * and NOT into DynamicMessages : https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/DynamicMessage
     */
    serdeConfig.put(SPECIFIC_PROTOBUF_VALUE_TYPE,ProtobufDataProto.class.getName());
    protoSerde.configure(serdeConfig, false);

Then i can create my input stream and it will be deserialized:

 //Define a Serde for the key
 final Serde<byte[]> bytesSerde = Serdes.ByteArray();
 //Define the stream
 StreamsBuilder streamsBuilder = new StreamsBuilder();
 streamsBuilder.stream("inputTopic", Consumed.with(bytesSerde, protoSerde));
 /* 
 add your treatments, maps, filter etc
 ...
 */
 streamsBuilder.build();
Peacemaker answered 1/8, 2022 at 12:36 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.