Kafka Streams Testing : java.util.NoSuchElementException: Uninitialized topic: "output_topic_name"
Asked Answered
A

1

10

I've written a test class for kafka stream application as per https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html , the code for which is

import com.EventSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Properties;

public class KafkaStreamsConfigTest {
    
private TopologyTestDriver testDriver;
private TestInputTopic<String, Object> inputTopic;
private TestOutputTopic<String, Object> outputTopic;

private Serde<String> stringSerde = new Serdes.StringSerde();
private EventSerde eventSerde= new EventSerde();

private String key="test";
private Object value = "some value";
private Object expected_value = "real value";

String kafkaEventSourceTopic = "raw_events";
String kafkaEventSinkTopic = "processed_events";
String kafkaCacheSinkTopic = "cache_objects";

String applicationId = "my-app";
String test_dummy = "dummy:1234";

@Before
public void setup() {
    Topology topology = new Topology();
      
    topology.addSource(kafkaEventSourceTopic, kafkaEventSourceTopic);

    topology.addProcessor(ProcessRouter.class.getSimpleName(), ProcessRouter::new, kafkaEventSourceTopic);

    topology.addProcessor(WorkforceVisit.class.getSimpleName(), WorkforceVisit::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(DefaultProcessor.class.getSimpleName(), DefaultProcessor::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(CacheWorkforceShift.class.getSimpleName(), CacheWorkforceShift::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(DigitalcareShiftassisstantTracking.class.getSimpleName(), DigitalcareShiftassisstantTracking::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(WorkforceLocationUpdate.class.getSimpleName(), WorkforceLocationUpdate::new
            , ProcessRouter.class.getSimpleName());

    topology.addSink(kafkaEventSinkTopic, kafkaEventSinkTopic
            , WorkforceVisit.class.getSimpleName(), DefaultProcessor.class.getSimpleName()
            , CacheWorkforceShift.class.getSimpleName(), DigitalcareShiftassisstantTracking.class.getSimpleName()
            , WorkforceLocationUpdate.class.getSimpleName());

    topology.addSink(kafkaCacheSinkTopic, kafkaCacheSinkTopic
            , WorkforceVisit.class.getSimpleName()
            , CacheWorkforceShift.class.getSimpleName(), DigitalcareShiftassisstantTracking.class.getSimpleName()
            , WorkforceLocationUpdate.class.getSimpleName());

    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);       
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, test_dummy);
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, EventSerde.class.getName());
        
    testDriver = new TopologyTestDriver(topology, properties);

    //setup test topics
    inputTopic = testDriver.createInputTopic(kafkaEventSourceTopic, stringSerde.serializer(), eventSerde.serializer());
    outputTopic = testDriver.createOutputTopic(kafkaEventSinkTopic, stringSerde.deserializer(), eventSerde.deserializer());

}

@After
public void tearDown() {
    testDriver.close();
}

@Test
public void outputEqualsTrue()
{
    inputTopic.pipeInput(key, value);
    Object b =  outputTopic.readValue();
    System.out.println(b.toString());
    assertEquals(b,expected_value);       
}

where I used EventSerde class to serialize and deserialize the value.

When I run this code it gives the error java.util.NoSuchElementException: Uninitialized topic: processed_events with the following stacktrace:

java.util.NoSuchElementException: Uninitialized topic: processed_events

at org.apache.kafka.streams.TopologyTestDriver.readRecord(TopologyTestDriver.java:715)
at org.apache.kafka.streams.TestOutputTopic.readRecord(TestOutputTopic.java:100)
at org.apache.kafka.streams.TestOutputTopic.readValue(TestOutputTopic.java:80)
at com.uhx.platform.eventprocessor.config.KafkaStreamsConfigTest.outputEqualsTrue(KafkaStreamsConfigTest.java:111)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)

As you can see i have initialized both input and output topics. I have also debugged the code and the error occurs when i read the value from output topic

outputTopic.readValue();

I don't understand what else i should do to initialize the outputTopic. Can anyone help me with this problem?

I am using apache kafka-streams-test-utils 2.4.0 and kafka-streams 2.4.0

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-test-utils</artifactId>
        <version>2.4.0</version>
        <scope>test</scope>
    </dependency>
Automatism answered 30/1, 2020 at 14:2 Comment(3)
The error message is a little bit miss leading -- it means, that no output record was ever written into the output topic. Hence, I assume that the problem is with your topology.Massasauga
Matthias is right - this expection is totally misleading.Callable
What was the issue? I mean, what solved it?Amatol
C
12

The exception is misleading. It just means that no output record was ever written into the output topic. Hence, the problem lies with the defined topology.

To avoid the misleading exception, you can check if your output topic is not empty before trying to read from it. That way you'll get a clearer picture:

@Test
public void outputEqualsTrue()
{
    inputTopic.pipeInput(key, value);
    assert(outputTopic.isEmpty(), false);
    Object b = outputTopic.readValue();
    System.out.println(b.toString());
    assertEquals(b,expected_value);
}
Callable answered 5/2, 2020 at 17:46 Comment(4)
Exception also occurs when you try to read value from outputTopic i.e. outputTopic.readValue().. Is there any work around!!Kehoe
You need to check if data is available in the output topic before reading by calling outputTopic.isEmpty(). Alternatively you can use the readAsList* methods to read value from outputTopic.Callable
Thank you.. It worked. Actually, the problem was inside the topology only..Kehoe
What was it @Kehoe ?Amatol

© 2022 - 2025 — McMap. All rights reserved.