Embedded Kafka tests randomly failing
Asked Answered
V

2

6

I implemented a bunch of integration tests using EmbededKafka to test one of our Kafka streams application running using spring-kafka framework.

The stream application is reading a message from a Kafka topic, it stores it into an internal state store, does some transformation and sends it to another micro service into a requested topic. When the response comes back into the responded topic it retrieves the original message from the state store and depending on some business logic it forwards it to one of our downstream systems, each one having their own topic.

The integration tests just exercise the various permutations of the business conditions.

Initially the tests were split across to classes. When running the build the tests from one class were clashing with the ones in the other class with some conflict exceptions. I did not spend too much time on this and just moved all tests inside the same class. This fixed my issue with all tests passing either from gradle build or from intelij EDI.

Here is the test:

package au.nab.tlm.streams.integration;

import au.nab.tlm.streams.serde.EntitlementsCheckSerDes;
import au.nab.tlm.streams.test.support.MockEntitlementsCheckSerDes;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;

@SpringBootTest
@ContextConfiguration(classes = {MyTopologiesIntegrationTest.TestKafkaConfig.class})
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@EmbeddedKafka(
        ports = 9092,
        partitions = 1,
        topics = {
                "topic-1.v1",
                "topic-2.v1",
                "topic-3.v1",
                "topic-4.v1",
                "topic-5.v1",
                "topic-6.v1",
        },
        brokerProperties = {"transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1", "log.dir=/tmp/embedded-kafka"}
)
public class MyTopologiesIntegrationTest {
    @Autowired
    EmbeddedKafkaBroker kafkaBroker;

    @Autowired
    EntitlementsCheckSerDes appSerDes;

    @Test
    public void test_1() {
    }

    @Test
    public void test_2() {
    }

    @Test
    public void test_3() {
    }

    @Test
    public void test_4() {
    }

    @Test
    public void test_5() {
    }

    @TestConfiguration
    public static class TestKafkaConfig {
        @Bean
        EntitlementsCheckSerDes appSerDes() {
            return new MockEntitlementsCheckSerDes();
        }
    }
}

Happy with the result I pushed my change just to notice the build was failing on our CI server. Running it once again it failed again this time with different failures than first time. I got a colleague to have a look and he was having the same failure experience similar with CI server. I ran the build at least twenty times on my machine and it was always passing. Running tests one by one on my colleague was always passing too.

The most common exception we were getting was that the topic xyz already existed, but occasionally there was some other exceptions suggesting a cluster could not be fond or similar. All these exceptions were indicating to us the the embedded Kafka used in aa previous test was not completely shut down before the next test started, despite using the DirtiesContext annotation.The very first test being run was always passing.

We both spent a full day pulling off our hair, it was no way to get it working. We tried whatever and wherever google took us to, no luck at all. In the end we left the only one test scenario (the one with biggest number of interactions) in the test class and disabled the rest.

Obviously this is not something to be accepted as a permanent solution and I would really want to understand what we done wrong and how could fix it.

Thank you in advance for your inputs.

Valera answered 16/6, 2020 at 6:34 Comment(1)
unless your CI/CD pipeline is using containers you can have port conflict for using a fixed 9292, also dirty temp file and topic data form previous embedded kafka instances. Even though you are using DirtiesContext I don't think it help with the EmbeddedKafka, I would go deeper on the documentation of embedded kafka to how properly clean shutdown and clean up before each testsAndroclinium
A
10

Do not use a fixed port ports = 9092, - by default the embedded kafka will listen on a random port selected by the operating system.

You should use that for your test cases.

You can get the broker addresses by calling this.kafkaBroker.getBrokersAsString().

Alix answered 16/6, 2020 at 14:31 Comment(3)
I have seen this solution in some other questions. When I took the port out all test failed with timeouts as no. processing happened at all. How would production code know what that port is? We have that port configured in application.yaml and is configurable via some environment properties. In other words how would our spring boot application point to the Kafka server created by the test?Valera
Of course, it's correct in the yaml; it's just not correct to use it for tests where you are excplicity setting it @EmbeddedKafka(ports = 9092, .... You can tell boot to use the embedded kafka port by adding bootstrapServersProperty = "spring.kafka.bootstrap-servers" to the @EmbeddedKafka annotation; it will override the yaml value - see the documentation.Alix
That was so true Gary. I am new to this and mostly copying and pasting from some others work but yes I do need a good doco readValera
S
0

Try Removing @DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD) @EmbeddedKafka(

Scintillation answered 15/3, 2024 at 15:24 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.