A brief explanation of what I want to achieve: I want to do functional tests for a kafka stream topology (using TopologyTestDriver) for avro records.
Issues: Can't "mock" schemaRegistry to automate the schema publishing/reading
What I tried so far is use MockSchemaRegistryClient to try to mock the schemaRegistry, but I don't know how to link it to the Avro Serde.
Code
public class SyncronizerIntegrationTest {
private ConsumerRecordFactory<String, Tracking> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new SpecificAvroSerializer<>());
MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
@Test
void integrationTest() throws IOException, RestClientException {
Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streamsTest");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
props.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081"); //Dunno if this do anything? :/
StreamsBuilder kStreamBuilder = new StreamsBuilder();
Serde<Tracking> avroSerde = getAvroSerde();
mockSchemaRegistryClient.register(Tracking.getClassSchema().getName(), Tracking.getClassSchema());
KStream<String, Tracking> unmappedOrdersStream = kStreamBuilder.stream(
"topic",
Consumed.with(Serdes.String(), avroSerde));
unmappedOrdersStream
.filter((k, v) -> v != null).to("ouput");
Topology topology = kStreamBuilder.build();
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));
}
}
AvroSerde method
private <T extends SpecificRecord> Serde<T> getAvroSerde() {
// Configure Avro ser/des
final Map<String,String> avroSerdeConfig = new HashMap<>();
avroSerdeConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081");
final Serde<T> avroSerde = new SpecificAvroSerde<>();
avroSerde.configure(avroSerdeConfig, false); // `false` for record values
return avroSerde;
}
If I run the test without testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));
it works well (looks like everything is properly settled)
But
When I try to insert data(pipeInput), it throws the following exception: The object "Tracking" is full filled.
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:184)
at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:270)
Edited, I didn't deleted this, for "history log" to provide the path followed.
MockSchemaRegistry#register(String subject, Schema schema)
? – Stricker