Kafka Streams: Failed to flush state store caused by java.lang.ClassCastException: cannot case key to value
Asked Answered
A

0

6

We have been using Kafka streams for a while but never got to write tests to cover our topologies. We decided to give it a go and use the Topology Test driver provided by the streams library. Unfortunately we hit an issue we cannot resolve. Here is a dummy version of our production code with the same semantics.

It joins 2 topics containing 2 types of documents. Our aim is to aggregate the documents in a "folder" per person, where info from the different documents is used. When running the test we hit an exception which is caused by a bad cast from PersonKey to DocumentA. Underneath you can see the testing setup, the schema of the data structures and the stacktrace of the exception.

package com.zenjob.me.indexer.application.domain;

import com.demo.DocumentA;
import com.demo.DocumentB;
import com.demo.DocumentFolder;
import com.demo.PersonKey;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import lombok.extern.log4j.Log4j2;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.junit.Assert;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@SuppressWarnings("SimplifiableJUnitAssertion")
@Log4j2
class DemoTest {

    private SchemaRegistryClient schemaRegistryClient     = new MockSchemaRegistryClient();
    private String               documentATopicName       = "documentATopicName";
    private String               documentBTopicName       = "documentBTopicName";
    private String               documentFoldersTopicName = "documentFoldersTopicName";

    private <T extends SpecificRecord> SpecificAvroSerde<T> getSerde(boolean isForKey) {
        Map<String, Object> serdeConfig = new HashMap<>();
        serdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wat-ever-url-anyway-it-is-mocked");

        SpecificAvroSerde<T> serde = new SpecificAvroSerde<>(schemaRegistryClient);
        serde.configure(serdeConfig, isForKey);
        return serde;
    }

    @Test
    void test() {

        StreamsBuilder builder = new StreamsBuilder();

        SpecificAvroSerde<PersonKey> keySerde = this.getSerde(true);
        SpecificAvroSerde<DocumentA> documentASerde = this.getSerde(false);
        SpecificAvroSerde<DocumentB> documentBSerde = this.getSerde(false);
        SpecificAvroSerde<DocumentFolder> documentFolderSerde = this.getSerde(false);

        KTable<PersonKey, DocumentA> docATable = builder.table(documentATopicName, Consumed.with(keySerde, documentASerde), Materialized.with(keySerde, documentASerde));
        KTable<PersonKey, DocumentB> docBTable = builder.table(documentBTopicName, Consumed.with(keySerde, documentBSerde), Materialized.with(keySerde, documentBSerde));

        docATable
                .mapValues(documentA ->
                                DocumentFolder.newBuilder()
                                        .setPropertyA(documentA.getPropertyA())
                                        .build(),
                        Materialized.with(keySerde, documentFolderSerde))
                .leftJoin(docBTable,
                        (folder, documentB) -> {
                            if (documentB == null) {
                                return folder;
                            }
                            return DocumentFolder.newBuilder(folder)
                                    .setPropertyB(documentB.getPropertyB())
                                    .build();
                        },
                        Materialized.with(keySerde, documentFolderSerde)
                )
                .toStream()
                .to(documentFoldersTopicName, Produced.with(keySerde, documentFolderSerde));

        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

        TopologyTestDriver driver = new TopologyTestDriver(builder.build(), config);

        ConsumerRecordFactory<PersonKey, DocumentA> documentAConsumerRecordFactory = new ConsumerRecordFactory<>(documentATopicName, keySerde.serializer(), documentASerde.serializer());
        ConsumerRecordFactory<PersonKey, DocumentB> documentBConsumerRecordFactory = new ConsumerRecordFactory<>(documentBTopicName, keySerde.serializer(), documentBSerde.serializer());

        // When
        String personId = "person-id";
        PersonKey key = PersonKey.newBuilder().setPropertyA(personId).build();
        DocumentA documentA = DocumentA.newBuilder().setPropertyA("docA-propA").build();
        DocumentB documentB = DocumentB.newBuilder().setPropertyB("docB-propB").build();

        driver.pipeInput(documentAConsumerRecordFactory.create(key, documentA));
        driver.pipeInput(documentBConsumerRecordFactory.create(key, documentB));

        ProducerRecord<PersonKey, DocumentFolder> output1 = driver.readOutput(documentFoldersTopicName, keySerde.deserializer(), documentFolderSerde.deserializer());
        ProducerRecord<PersonKey, DocumentFolder> output2 = driver.readOutput(documentFoldersTopicName, keySerde.deserializer(), documentFolderSerde.deserializer());

        log.info(output1);
        log.info(output2);

        Assert.assertEquals(documentA.getPropertyA(), output1.value().getPropertyA());
        Assert.assertEquals(null, output1.value().getPropertyB());

        Assert.assertEquals(documentA.getPropertyA(), output2.value().getPropertyA());
        Assert.assertEquals(documentB.getPropertyB(), output2.value().getPropertyB());

        driver.close();
    }
}

DocumentA

{
  "type" : "record",
  "name" : "DocumentA",
  "namespace" : "com.demo",
  "fields" : [
    {
      "name" : "propertyA",
      "type" : "string"
    }
  ]
}

DocumentB

{
  "type" : "record",
  "name" : "DocumentB",
  "namespace" : "com.demo",
  "fields" : [
    {
      "name" : "propertyB",
      "type" : "string"
    }
  ]
}

DocumentFolder

{
  "type" : "record",
  "name" : "DocumentFolder",
  "namespace" : "com.demo",
  "fields" : [
    {
      "name" : "propertyA",
      "type" : "string"
    },
    {
      "name" : "propertyB",
      "type" : [
        "null",
        "string"
      ],
      "default" : null
    }
  ]
}

PersonKey

{
  "type" : "record",
  "name" : "PersonKey",
  "namespace" : "com.demo",
  "fields" : [
    {
      "name" : "propertyA",
      "type" : "string"
    }
  ]
}

Exception

task [0_0] Failed to flush state store documentATopicName-STATE-STORE-0000000000
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store documentATopicName-STATE-STORE-0000000000
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:251)
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:521)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:473)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:461)
    at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:421)
    at com.zenjob.me.indexer.application.domain.DemoTest.test(DemoTest.java:97)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:532)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:171)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:167)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:114)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:59)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:108)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
    at java.util.ArrayList.forEach(ArrayList.java:1257)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$4(NodeTestTask.java:112)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:72)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:98)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:74)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:170)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:154)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:90)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:92)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$100(JUnitPlatformTestClassProcessor.java:77)
    at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:73)
    at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:61)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
    at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
    at com.sun.proxy.$Proxy2.stop(Unknown Source)
    at org.gradle.api.internal.tasks.testing.worker.TestWorker.stop(TestWorker.java:131)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
    at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155)
    at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137)
    at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
    at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
    at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: com.demo.PersonKey cannot be cast to com.demo.DocumentA
    at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103)
    at org.apache.kafka.streams.kstream.internals.KTableMapValues.computeValue(KTableMapValues.java:78)
    at org.apache.kafka.streams.kstream.internals.KTableMapValues.access$400(KTableMapValues.java:27)
    at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:117)
    at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:97)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
    at org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$setFlushListener$1(MeteredKeyValueStore.java:131)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:92)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
    at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:272)
    at org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:237)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248)
    ... 68 more

Kafka v2.3.0 Avro v1.9.1' KafkaAvroSerde v5.2.1'

UPDATE

I tried rewriting the topology with the processor API but without any luck. After that I tried using a real schema registry and the test passed so it appears the issue is with the MockSchemaRegistry. Will post another update when I locate the reason.

UPDATE 2

I managed to get it to work with the mock schema registry but I had to manually register all schemas including the ones of state stores and internal state store changelog topics

Ambsace answered 6/1, 2020 at 16:16 Comment(6)
this may help you #46749655Kkt
Thanks! Unfortunately in our case it does not work at all and the deserialiser is called as we discovered during debugging. We traced the exception to a place where the class KTableMapValues calls an internal mapper implementing ValueMapperWithKey. We still don't the reason or the way to fix it.Ambsace
Your the default stream deserializer for the key is to binary ?Kkt
All the serdes are defined explicitly so the mock schema registry is used instead of the default ones.Ambsace
What is the relationship between DocumentA and PersonKey classes?Kolnick
There is no relationship. I found the issue and updated the question :)Ambsace

© 2022 - 2024 — McMap. All rights reserved.