Kafka consumer unit test with Avro Schema registry failing
Asked Answered
C

5

7

I'm writing a consumer which listens to a Kafka topic and consumes message whenever message is available. I've tested the logic/code by running Kafka locally and it's working fine.

While writing the unit/component test cases, it's failing with avro schema registry url error. I've tried different options available on internet but could not find anything working. I am not sure if my approach is even correct. Please help.

Listener Class

@KafkaListener(topics = "positionmgmt.v1", containerFactory = "genericKafkaListenerFactory")
    public void receive(ConsumerRecord<String, GenericRecord> consumerRecord) {
        try {
            GenericRecord generic = consumerRecord.value();
            Object obj = generic.get("metadata");

            ObjectMapper mapper = new ObjectMapper();

            Header headerMetaData = mapper.readValue(obj.toString(), Header.class);

            System.out.println("Received payload :   " + consumerRecord.value());

            //Call backend with details in GenericRecord 

        }catch (Exception e){
            System.out.println("Exception while reading message from Kafka " + e );
        }

Kafka config

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> genericKafkaListenerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(genericConsumerFactory());
        return factory;
    }

public ConsumerFactory<String, GenericRecord> genericConsumerFactory() {
        Map<String, Object> config = new HashMap<>();

        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        config.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
        return new DefaultKafkaConsumerFactory<>(config);
    }

Avro Schema

{
   "type":"record",
   "name":"KafkaEvent",
   "namespace":"com.ms.model.avro",
   "fields":[
      {
         "name":"metadata",
         "type":{
            "name":"metadata",
            "type":"record",
            "fields":[
               {
                  "name":"correlationid",
                  "type":"string",
                  "doc":"this is corrleation id for transaction"
               },
               {
                  "name":"subject",
                  "type":"string",
                  "doc":"this is subject for transaction"
               },
               {
                  "name":"version",
                  "type":"string",
                  "doc":"this is version for transaction"
               }
            ]
         }
      },
      {
         "name":"name",
         "type":"string"
      },
      {
         "name":"dept",
         "type":"string"
      },
      {
         "name":"empnumber",
         "type":"string"
      }
   ]
}

This is my test code which I tried...

@ComponentTest
    @RunWith(SpringRunner.class)
    @EmbeddedKafka(partitions = 1, topics = { "positionmgmt.v1" })
    @SpringBootTest(classes={Application.class})
    @DirtiesContext
    public class ConsumeKafkaMessageTest {

      private static final String TEST_TOPIC = "positionmgmt.v1";

      @Autowired(required=true)
      EmbeddedKafkaBroker embeddedKafkaBroker;

      private Schema schema;

      private  SchemaRegistryClient schemaRegistry;
      private  KafkaAvroSerializer avroSerializer;
      private  KafkaAvroDeserializer avroDeserializer;

      private MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
      private String registryUrl = "unused";

      private String avroSchema = string representation of avro schema

      @BeforeEach
      public void setUp() throws Exception {
        Schema.Parser parser = new Schema.Parser();
        schema = parser.parse(avroSchema);

        mockSchemaRegistryClient.register("Vendors-value", schema);
      }

      @Test
      public void consumeKafkaMessage_receive_sucess() {

        Schema metadataSchema = schema.getField("metadata").schema();
        GenericRecord metadata = new GenericData.Record(metadataSchema);
        metadata.put("version", "1.0");
        metadata.put("correlationid", "correlationid");
        metadata.put("subject", "metadata");

        GenericRecord record = new GenericData.Record(schema);
        record.put("metadata", metadata);
        record.put("name", "ABC");
        record.put("dept", "XYZ");

        Consumer<String, GenericRecord> consumer = configureConsumer();
        Producer<String, GenericRecord> producer = configureProducer();

        ProducerRecord<String, GenericRecord> prodRecord = new ProducerRecord<String, GenericRecord>(TEST_TOPIC, record);

        producer.send(prodRecord);

        ConsumerRecord<String, GenericRecord> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TEST_TOPIC);
        assertNotNull(singleRecord.value());

        consumer.close();
        producer.close();

      }

      private Consumer<String, GenericRecord> configureConsumer() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("groupid", "true", embeddedKafkaBroker);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        Consumer<String, GenericRecord> consumer = new DefaultKafkaConsumerFactory<String, GenericRecord>(consumerProps).createConsumer();
        consumer.subscribe(Collections.singleton(TEST_TOPIC));
        return consumer;
      }

      private Producer<String, GenericRecord> configureProducer() {
        Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
        producerProps.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, mockSchemaRegistryClient);
        producerProps.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, "false");
        return new DefaultKafkaProducerFactory<String, GenericRecord>(producerProps).createProducer();
      }

}

Error

component.com.ms.listener.ConsumeKafkaMessageTest > consumeKafkaMessage_receive_sucess() FAILED
    org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:457)
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:289)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:318)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:305)
        at component.com.ms.listener.ConsumeKafkaMessageTest.configureProducer(ConsumeKafkaMessageTest.java:125)
        at component.com.ms.listener.ConsumeKafkaMessageTest.consumeKafkaMessage_receive_sucess(ConsumeKafkaMessageTest.java:97)

        Caused by:
        io.confluent.common.config.ConfigException: Invalid value io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient@20751870 for configuration schema.registry.url: Expected a comma separated list.
            at io.confluent.common.config.ConfigDef.parseType(ConfigDef.java:345)
            at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:249)
            at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
            at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:105)
            at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
            at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
            at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.configure(ExtendedSerializer.java:60)
            at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:372)
            ... 5 more
Celadon answered 20/8, 2019 at 13:42 Comment(0)
I
5

I investigated it a bit and I found out that the problem is in the CashedSchemaRegistryClient that is used by the KafkaAvroSerializer/Deserializer. It is used to fetch the schema definitions from the Confluent Schema Registry.

You already have your schema definition locally so you don't need to go to Schema Registry for them. (at least in your tests)

I had a similar problem and I solved it by creating a custom KafkaAvroSerializer/KafkaAvroDeserializer.

This is a sample of KafkaAvroSerializer. It is rather simple. You just need to extend the provided KafkaAvroSerializer and tell him to use MockSchemaRegistryClient.

public class CustomKafkaAvroSerializer extends KafkaAvroSerializer {
    public CustomKafkaAvroSerializer() {
        super();
        super.schemaRegistry = new MockSchemaRegistryClient();
    }

    public CustomKafkaAvroSerializer(SchemaRegistryClient client) {
        super(new MockSchemaRegistryClient());
    }

    public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {
        super(new MockSchemaRegistryClient(), props);
    }
}

This is a sample of KafkaAvroDeserializer. When the deserialize method is called you need to tell him which schema to use.

public class CustomKafkaAvroDeserializer extends KafkaAvroDeserializer {
    @Override
    public Object deserialize(String topic, byte[] bytes) {
        this.schemaRegistry = getMockClient(KafkaEvent.SCHEMA$);  
        return super.deserialize(topic, bytes);
    }

    private static SchemaRegistryClient getMockClient(final Schema schema$) {
        return new MockSchemaRegistryClient() {
            @Override
            public synchronized Schema getById(int id) {
                return schema$;
            }
        };
    }
}

The last step is to tell spring to use created Serializer/Deserializer

spring.kafka.producer.properties.schema.registry.url= not-used
spring.kafka.producer.value-serializer = CustomKafkaAvroSerializer
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.group-id = showcase-producer-id

spring.kafka.consumer.properties.schema.registry.url= not-used
spring.kafka.consumer.value-deserializer = CustomKafkaAvroDeserializer
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id = showcase-consumer-id
spring.kafka.auto.offset.reset = earliest

spring.kafka.producer.auto.register.schemas= true
spring.kafka.properties.specific.avro.reader= true

I wrote a short blog post about that: https://medium.com/@igorvlahek1/no-need-for-schema-registry-in-your-spring-kafka-tests-a5b81468a0e1?source=friends_link&sk=e55f73b86504e9f577e259181c8d0e23

Link to the working sample project: https://github.com/ivlahek/kafka-avro-without-registry

Ipswich answered 18/10, 2019 at 21:37 Comment(3)
You got a neat explanation and code sample as well. Thanks. Followed your approach can't deserialize...[Looks like it is sill looking for schema-reg-url because my custom deserializer is extending KafkaAvroDeserializer].org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition kidame-0 at offset 0. If needed, please seek past the record to continue consumption. Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 1 Caused by: java.io.IOException: Cannot get schema from schema registry!Sapp
Any idea for the above issue ? My consumer config is using the custom deserializer and no-schema registrySapp
Can you please share your configuration or a code sample on some code repo?Ipswich
Q
2

The answer from @ivlahek is working, but if you look at this example 3 year later you might want to do slight modification to CustomKafkaAvroDeserializer

private static SchemaRegistryClient getMockClient(final Schema schema) {
        return new MockSchemaRegistryClient() {

     @Override
     public ParsedSchema getSchemaBySubjectAndId(String subject, int id)
                    throws IOException, RestClientException {
         return new AvroSchema(schema);
     }            
 };
}
Quinquepartite answered 26/4, 2022 at 12:25 Comment(0)
D
0

As the error says, you need to provide a string to the registry in the producer config, not an object.

Since you're using the Mock class, that string could be anything...

However, you'll need to construct the serializers given the registry instance

Serializer serializer = new KafkaAvroSerializer(mockSchemaRegistry);
 // make config map with ("schema.registry.url", "unused") 
serializer.configure(config, false);

Otherwise, it will try to create a non-mocked client

And put that into the properties

producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serializer);
Diggs answered 20/8, 2019 at 22:19 Comment(4)
Thanks for the response..I did try but getting following error component.com.ms.listener.ConsumeKafkaMessageTest > consumeKafkaMessage_receive_sucess() FAILED org.apache.kafka.common.errors.SerializationException: Error serializing Avro message Caused by: java.net.UnknownHostException: foo at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589)Celadon
Hmm. I think youll also need to use Mock Producer and Consumer... Otherwise, it seems you'll need an actual Mock web server like github.com/bakdata/fluent-kafka-streams-tests/blob/master/…Diggs
I believe you meant serializer.configure(config, false); If yes, that what values goes into config ?Celadon
You can move the values from the producer/consumer properties using KafkaAvroSerializerConfig into that configDiggs
V
0

If your @KafkaListener is in test class then you can read it in StringDeserializer then convert it to the desired class manually

    @Autowired
    private MyKafkaAvroDeserializer myKafkaAvroDeserializer;

    @KafkaListener( topics = "test")
    public void inputData(ConsumerRecord<?, ?> consumerRecord) {
        log.info("received payload='{}'", consumerRecord.toString(),consumerRecord.value());

        GenericRecord genericRecord = (GenericRecord)myKafkaAvroDeserializer.deserialize("test",consumerRecord.value().toString().getBytes(StandardCharsets.UTF_8));


        Myclass myclass = (Myclass) SpecificData.get().deepCopy(Myclass.SCHEMA$, genericRecord);
}
@Component
public class MyKafkaAvroDeserializer extends KafkaAvroDeserializer {
    @Override
    public Object deserialize(String topic, byte[] bytes) {

            this.schemaRegistry = getMockClient(Myclass.SCHEMA$);

        return super.deserialize(topic, bytes);
    }



    private static SchemaRegistryClient getMockClient(final Schema schema$) {
        return new MockSchemaRegistryClient() {
            @Override
            public synchronized org.apache.avro.Schema getById(int id) {
                return schema$;
            }
        };
    }
}

Remember to add schema registry and key/value serializer in application.yml although it won't be used

    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    properties:
      schema.registry.url :http://localhost:8080
Vagabond answered 22/10, 2021 at 13:47 Comment(0)
K
0

Instead of using new MockSchemaRegistryClient() you should use MockSchemaRegistry.getClientForScope("dummy-registry"). This should match a schema.registry.url = mock://dummy-registry, which will hook the consumer together with your test.

Kristopher answered 15/4, 2024 at 10:33 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.