How to have KafkaProducer to use a mock Schema Registry for testing?
Asked Answered
W

4

21

I'm using KafkaProducer in my test cases and my producer uses the schemaRegistryUrl which points a my local instance of Schema Registry. Is there a way to mock how KafkaProducer connects with the Schema Registry? That is, to have KafkaProducer/Consumer in my tests to work without a running instance of Schema Registry.

Welles answered 23/9, 2016 at 22:59 Comment(1)
Probably helpful if you are more clear what you are testing here. Can you clarify if you need the schema registry function? If you do, then you can't mock test its function really. If you don't need to test its function then what are you testing?Finale
D
18

https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerDe.java

In 5.3.x, you can find a MOCK_URL_PREFIX = "mock://", so just set the test schemaRegistryUrl with the prefix "mock://", like: "mock://testurl".

Denature answered 6/1, 2020 at 22:43 Comment(4)
The github url has moved. Can you update that. Btw, just adding mock://testurl for my schemaRegistryUrl property worked like a charm!Hansiain
I second @VivekSethi request. Is there any documentation saying to use this? I couldn't find anything anywhere. I came across this solution purely by chance. It works like a charmHarvest
Work great also for me. It's can be great if you can add any link that can explain what exactly happen when it was request mock:// url or add your explanation.. Is it possible to make additional use of it for tests or is it unique for a mocking schema?Higginson
New links: github.com/confluentinc/schema-registry/blob/… and github.com/confluentinc/schema-registry/blob/…Selie
K
14

There absolutely is. The KafkaAvroSerializer and KafkaAvroDeserializer both have a constructor that takes in a SchemaRegistryClient. You can use a MockSchemaRegistryClient as the SchemaRegistryClient. Here's a code snippet showing how to do that:

private MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
private String registryUrl = "unused";

public <T> Serde<T> getAvroSerde(boolean isKey) {
    return Serdes.serdeFrom(getSerializer(isKey), getDeserializer(isKey));
}

private <T> Serializer<T> getSerializer(boolean isKey) {
    Map<String, Object> map = new HashMap<>();
    map.put(KafkaAvroDeserializerConfig.AUTO_REGISTER_SCHEMAS, true);
    map.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
    Serializer<T> serializer = (Serializer) new KafkaAvroSerializer(mockSchemaRegistryClient);
    serializer.configure(map, isKey);
    return serializer;
}

private <T> Deserializer<T> getDeserializer(boolean key) {
    Map<String, Object> map = new HashMap<>();
    map.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
    map.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
    Deserializer<T> deserializer = (Deserializer) new KafkaAvroDeserializer(mockSchemaRegistryClient);
    deserializer.configure(map, key);
    return deserializer;
}
Kleon answered 2/5, 2018 at 18:45 Comment(1)
And what if the code is not instantiating Serializers, but simply specifying the class in Kafka.properties? E.g. value.serializer=io.confluent.kafka.serializers.KafkaAvroSerializerIaea
N
3

You can set the producer property schema.registry.url to a mock URL:

schema.registry.url: "mock://my-scope"

The value after the two slashes is the scope. In your integration test you can retrieve the SchemaRegistryClient for the defined scope and register for example a JSON schema:

SchemaRegistryClient client = MockSchemaRegistry.getClientForScope("my-scope");
client.register("my-subject", new JsonSchema("{...}"));
Neelyneeoma answered 19/1, 2023 at 10:17 Comment(1)
Thanks! This solved everything™ for me after having tried new MockSchemaRegistryClient() without success.Inoue
E
1

You can do it by creating your own Custom KafkaAvroSerializer.

In your appication.yml for test instead of io.confluent.kafka.serializers.KafkaAvroSerializer use the custom class as below

producer: 
  value-serializer: com.project.application.custom.MycustomKafkaAvroSerializer
package com.project.application.custom;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;

import java.util.Map;

public class MycustomKafkaAvroSerializer extends KafkaAvroSerializer {
    public MycustomKafkaAvroSerializer() {
        super();
        super.schemaRegistry = new MockSchemaRegistryClient();
    }

    public MycustomKafkaAvroSerializer(SchemaRegistryClient client) {
        super(new MockSchemaRegistryClient());
    }

    public MycustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {
        super(new MockSchemaRegistryClient(), props);
    }
}

Also add value to schema registry url although it won't be used but it shouldn't be left blank.

properties:
  schema.registry.url: http://localhost:8080
Edveh answered 21/10, 2021 at 10:54 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.