We have been using Kafka streams for a while but never got to write tests to cover our topologies. We decided to give it a go and use the Topology Test driver provided by the streams library. Unfortunately we hit an issue we cannot resolve. Here is a dummy version of our production code with the same semantics.
It joins 2 topics containing 2 types of documents. Our aim is to aggregate the documents in a "folder" per person, where info from the different documents is used. When running the test we hit an exception which is caused by a bad cast from PersonKey to DocumentA. Underneath you can see the testing setup, the schema of the data structures and the stacktrace of the exception.
package com.zenjob.me.indexer.application.domain;
import com.demo.DocumentA;
import com.demo.DocumentB;
import com.demo.DocumentFolder;
import com.demo.PersonKey;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import lombok.extern.log4j.Log4j2;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@SuppressWarnings("SimplifiableJUnitAssertion")
@Log4j2
class DemoTest {
private SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();
private String documentATopicName = "documentATopicName";
private String documentBTopicName = "documentBTopicName";
private String documentFoldersTopicName = "documentFoldersTopicName";
private <T extends SpecificRecord> SpecificAvroSerde<T> getSerde(boolean isForKey) {
Map<String, Object> serdeConfig = new HashMap<>();
serdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wat-ever-url-anyway-it-is-mocked");
SpecificAvroSerde<T> serde = new SpecificAvroSerde<>(schemaRegistryClient);
serde.configure(serdeConfig, isForKey);
return serde;
}
@Test
void test() {
StreamsBuilder builder = new StreamsBuilder();
SpecificAvroSerde<PersonKey> keySerde = this.getSerde(true);
SpecificAvroSerde<DocumentA> documentASerde = this.getSerde(false);
SpecificAvroSerde<DocumentB> documentBSerde = this.getSerde(false);
SpecificAvroSerde<DocumentFolder> documentFolderSerde = this.getSerde(false);
KTable<PersonKey, DocumentA> docATable = builder.table(documentATopicName, Consumed.with(keySerde, documentASerde), Materialized.with(keySerde, documentASerde));
KTable<PersonKey, DocumentB> docBTable = builder.table(documentBTopicName, Consumed.with(keySerde, documentBSerde), Materialized.with(keySerde, documentBSerde));
docATable
.mapValues(documentA ->
DocumentFolder.newBuilder()
.setPropertyA(documentA.getPropertyA())
.build(),
Materialized.with(keySerde, documentFolderSerde))
.leftJoin(docBTable,
(folder, documentB) -> {
if (documentB == null) {
return folder;
}
return DocumentFolder.newBuilder(folder)
.setPropertyB(documentB.getPropertyB())
.build();
},
Materialized.with(keySerde, documentFolderSerde)
)
.toStream()
.to(documentFoldersTopicName, Produced.with(keySerde, documentFolderSerde));
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
TopologyTestDriver driver = new TopologyTestDriver(builder.build(), config);
ConsumerRecordFactory<PersonKey, DocumentA> documentAConsumerRecordFactory = new ConsumerRecordFactory<>(documentATopicName, keySerde.serializer(), documentASerde.serializer());
ConsumerRecordFactory<PersonKey, DocumentB> documentBConsumerRecordFactory = new ConsumerRecordFactory<>(documentBTopicName, keySerde.serializer(), documentBSerde.serializer());
// When
String personId = "person-id";
PersonKey key = PersonKey.newBuilder().setPropertyA(personId).build();
DocumentA documentA = DocumentA.newBuilder().setPropertyA("docA-propA").build();
DocumentB documentB = DocumentB.newBuilder().setPropertyB("docB-propB").build();
driver.pipeInput(documentAConsumerRecordFactory.create(key, documentA));
driver.pipeInput(documentBConsumerRecordFactory.create(key, documentB));
ProducerRecord<PersonKey, DocumentFolder> output1 = driver.readOutput(documentFoldersTopicName, keySerde.deserializer(), documentFolderSerde.deserializer());
ProducerRecord<PersonKey, DocumentFolder> output2 = driver.readOutput(documentFoldersTopicName, keySerde.deserializer(), documentFolderSerde.deserializer());
log.info(output1);
log.info(output2);
Assert.assertEquals(documentA.getPropertyA(), output1.value().getPropertyA());
Assert.assertEquals(null, output1.value().getPropertyB());
Assert.assertEquals(documentA.getPropertyA(), output2.value().getPropertyA());
Assert.assertEquals(documentB.getPropertyB(), output2.value().getPropertyB());
driver.close();
}
}
DocumentA
{
"type" : "record",
"name" : "DocumentA",
"namespace" : "com.demo",
"fields" : [
{
"name" : "propertyA",
"type" : "string"
}
]
}
DocumentB
{
"type" : "record",
"name" : "DocumentB",
"namespace" : "com.demo",
"fields" : [
{
"name" : "propertyB",
"type" : "string"
}
]
}
DocumentFolder
{
"type" : "record",
"name" : "DocumentFolder",
"namespace" : "com.demo",
"fields" : [
{
"name" : "propertyA",
"type" : "string"
},
{
"name" : "propertyB",
"type" : [
"null",
"string"
],
"default" : null
}
]
}
PersonKey
{
"type" : "record",
"name" : "PersonKey",
"namespace" : "com.demo",
"fields" : [
{
"name" : "propertyA",
"type" : "string"
}
]
}
Exception
task [0_0] Failed to flush state store documentATopicName-STATE-STORE-0000000000
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store documentATopicName-STATE-STORE-0000000000
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:421)
at com.zenjob.me.indexer.application.domain.DemoTest.test(DemoTest.java:97)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:170)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:154)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:90)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:92)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$100(JUnitPlatformTestClassProcessor.java:77)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:73)
at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.stop(Unknown Source)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:131)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155)
at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137)
at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: com.demo.PersonKey cannot be cast to com.demo.DocumentA
at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103)
at org.apache.kafka.streams.kstream.internals.KTableMapValues.computeValue(KTableMapValues.java:78)
at org.apache.kafka.streams.kstream.internals.KTableMapValues.access$400(KTableMapValues.java:27)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:117)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:97)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:131)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272)
at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:237)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
... 68 more
Kafka v2.3.0 Avro v1.9.1' KafkaAvroSerde v5.2.1'
UPDATE
I tried rewriting the topology with the processor API but without any luck. After that I tried using a real schema registry and the test passed so it appears the issue is with the MockSchemaRegistry. Will post another update when I locate the reason.
UPDATE 2
I managed to get it to work with the mock schema registry but I had to manually register all schemas including the ones of state stores and internal state store changelog topics
DocumentA
andPersonKey
classes? – Kolnick