I'm using KafkaProducer
in my test cases and my producer uses the schemaRegistryUrl
which points a my local instance of Schema Registry
. Is there a way to mock how KafkaProducer
connects with the Schema Registry? That is, to have KafkaProducer/Consumer
in my tests to work without a running instance of Schema Registry.
In 5.3.x, you can find a MOCK_URL_PREFIX = "mock://", so just set the test schemaRegistryUrl with the prefix "mock://", like: "mock://testurl".
There absolutely is. The KafkaAvroSerializer and KafkaAvroDeserializer both have a constructor that takes in a SchemaRegistryClient. You can use a MockSchemaRegistryClient as the SchemaRegistryClient. Here's a code snippet showing how to do that:
private MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
private String registryUrl = "unused";
public <T> Serde<T> getAvroSerde(boolean isKey) {
return Serdes.serdeFrom(getSerializer(isKey), getDeserializer(isKey));
}
private <T> Serializer<T> getSerializer(boolean isKey) {
Map<String, Object> map = new HashMap<>();
map.put(KafkaAvroDeserializerConfig.AUTO_REGISTER_SCHEMAS, true);
map.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
Serializer<T> serializer = (Serializer) new KafkaAvroSerializer(mockSchemaRegistryClient);
serializer.configure(map, isKey);
return serializer;
}
private <T> Deserializer<T> getDeserializer(boolean key) {
Map<String, Object> map = new HashMap<>();
map.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
map.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
Deserializer<T> deserializer = (Deserializer) new KafkaAvroDeserializer(mockSchemaRegistryClient);
deserializer.configure(map, key);
return deserializer;
}
You can set the producer property schema.registry.url
to a mock URL:
schema.registry.url: "mock://my-scope"
The value after the two slashes is the scope.
In your integration test you can retrieve the SchemaRegistryClient
for the defined scope and register for example a JSON schema:
SchemaRegistryClient client = MockSchemaRegistry.getClientForScope("my-scope");
client.register("my-subject", new JsonSchema("{...}"));
new MockSchemaRegistryClient()
without success. –
Inoue You can do it by creating your own Custom KafkaAvroSerializer.
In your appication.yml for test instead of io.confluent.kafka.serializers.KafkaAvroSerializer use the custom class as below
producer:
value-serializer: com.project.application.custom.MycustomKafkaAvroSerializer
package com.project.application.custom;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import java.util.Map;
public class MycustomKafkaAvroSerializer extends KafkaAvroSerializer {
public MycustomKafkaAvroSerializer() {
super();
super.schemaRegistry = new MockSchemaRegistryClient();
}
public MycustomKafkaAvroSerializer(SchemaRegistryClient client) {
super(new MockSchemaRegistryClient());
}
public MycustomKafkaAvroSerializer(SchemaRegistryClient client, Map<String, ?> props) {
super(new MockSchemaRegistryClient(), props);
}
}
Also add value to schema registry url although it won't be used but it shouldn't be left blank.
properties:
schema.registry.url: http://localhost:8080
© 2022 - 2024 — McMap. All rights reserved.