How can do Functional tests for Kafka Streams with Avro (schemaRegistry)?
Asked Answered
T

5

5
  • A brief explanation of what I want to achieve: I want to do functional tests for a kafka stream topology (using TopologyTestDriver) for avro records.

  • Issues: Can't "mock" schemaRegistry to automate the schema publishing/reading

What I tried so far is use MockSchemaRegistryClient to try to mock the schemaRegistry, but I don't know how to link it to the Avro Serde.

Code

public class SyncronizerIntegrationTest {


    private ConsumerRecordFactory<String, Tracking> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new SpecificAvroSerializer<>());

    MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();


    @Test
    void integrationTest() throws IOException, RestClientException {


        Properties props = new Properties();
        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streamsTest");
        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
        props.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081"); //Dunno if this do anything? :/
        StreamsBuilder kStreamBuilder = new StreamsBuilder();
        Serde<Tracking> avroSerde = getAvroSerde();
        mockSchemaRegistryClient.register(Tracking.getClassSchema().getName(), Tracking.getClassSchema());


        KStream<String, Tracking> unmappedOrdersStream = kStreamBuilder.stream(
                "topic",
                Consumed.with(Serdes.String(), avroSerde));

        unmappedOrdersStream
                .filter((k, v) -> v != null).to("ouput");

        Topology topology = kStreamBuilder.build();
        TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);

        testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));

    }
}

AvroSerde method

private <T extends SpecificRecord> Serde<T> getAvroSerde() {

    // Configure Avro ser/des
    final Map<String,String> avroSerdeConfig = new HashMap<>();
    avroSerdeConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081");

    final Serde<T> avroSerde = new SpecificAvroSerde<>();
    avroSerde.configure(avroSerdeConfig, false); // `false` for record values
    return avroSerde;
}

If I run the test without testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking())); it works well (looks like everything is properly settled)

But

When I try to insert data(pipeInput), it throws the following exception: The object "Tracking" is full filled.

org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
    at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:184)
    at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:270)

Edited, I didn't deleted this, for "history log" to provide the path followed.

Tusker answered 10/10, 2018 at 9:44 Comment(1)
Did you add the schema via MockSchemaRegistry#register(String subject, Schema schema) ?Stricker
S
3

Confluent provides a plethora of example code for testing Kafka (Streams) alongside the Schema Registry.

https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java

Most importantly, mocking isn't a complete integration test - starting an actual Kafka broker with an in memory schema registry is.

In the above code, see

@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();

And

streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl());
Spokeswoman answered 10/10, 2018 at 13:5 Comment(2)
Would you know what maven repository I'd have to import to use this EmbeddedSingleNodeKafkaCluster? From the comments in github.com/confluentinc/kafka-streams-examples/issues/26 it would appear it's not actually available yet.Professorate
I use kafka_2.11, classifier=test, scope=testSpokeswoman
S
3

Disclaimer: I have not tested this. It's just some ideas I share how you might be able to make it work. Hope this helps. If you can provide feedback to this answer, it would be great to get to a correct and working solution.

I don't think you can use the regular Avro Serde via config:

props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());

From my understanding, this will try to connect to

props.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081");

However, using MockSchemaRegistryClient there is no http endpoint to connect to. Instead, you need pass the mock client into the Serde when you create it:

MockSchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();
// add the schemas you want to use
schemaRegistryClient.register(...);
SpecificAvroSerde<T> serde = new SpecificAvroSerde<>(schemaRegistryClient);

Thus, you just configure a "dummy" http endpoint because the provide mock client won't use it anyway.

Passing in the corresponding Serde via code like here seems to be correct:

StreamBuilder.stream("topic", Consumed.with(Serdes.String(), avroSerde));
Stricker answered 10/10, 2018 at 18:32 Comment(4)
"Thus, you don't need to configure any http endpoint because the provide mock client won't use it anyway." It's required by the constructor, throws an exception if the property isn't filledTusker
Hi @MatthiasJ.Sax, constructor SpecificAvroSerde<>(schemaRegistryClient) has package private access and can't be used. I tried using KafkaAvroDeserializer and passing it doesn't help either. It still seems to be trying to access the schema from the passed url.Saragossa
Not sure what version you are using, but current 5.0 release has a public constructor: github.com/confluentinc/schema-registry/blob/5.0.0-post/…Stricker
Works like a gem!Boatyard
B
3

Approach that worked for us the best is java test containers with confluent platform docker images. You can setup up following docker compose file:

version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.0.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: confluentinc/cp-kafka:5.0.0
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
  schema-registry:
    image: confluentinc/cp-schema-registry:5.0.0
    environment:
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
    ports:
      - 8081:8081
    depends_on:
      - zookeeper
      - kafka

The only thing that you need to do is to add 127.0.0.1 kafka to /etc/hosts. With this approach you have essentially whole cluster up and running for your integration test. Cluster will be destroyed after integration test is finished.

EDIT:

Better docker-compose without actually modifying /etc/hosts

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.0.0
    hostname: zookeeper
    ports:
      - '32181:32181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000
    extra_hosts:
      - "moby:127.0.0.1"

  kafka:
    image: confluentinc/cp-kafka:5.0.0
    hostname: kafka
    ports:
      - '9092:9092'
      - '29092:29092'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    extra_hosts:
      - "moby:127.0.0.1"

  schema-registry:
    image: confluentinc/cp-schema-registry:5.0.0
    hostname: schema-registry
    depends_on:
      - zookeeper
      - kafka
    ports:
      - '8081:8081'
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:32181
    extra_hosts:
      - "moby:127.0.0.1"

Kafka will be available on localhost:9092

Blenny answered 10/10, 2018 at 19:14 Comment(2)
Editing your hosts file is wrong. Please refer to rmoff.net/2018/08/02/kafka-listeners-explained Though, this approach is too much overhead compared to starting an in memory broker, Zookeeper, etc because they're all Java processes already, so importing them isn't difficult (Kafka itself doesn't run unit/integration tests in containers)Spokeswoman
I would argue with that. Depends what you want to achieve. If you want to have full integration test with multiple versions of Kafka/Schema Registry to check compatibility etc I would prefer this approach. I would argue about the overhead as well since with test containers it's very simple to write this kind of integration tests. Thanks for the link.Blenny
B
1

For doing this I ended up doing this small test library based on testcontainers: https://github.com/vspiliop/embedded-kafka-cluster. Starts a fully configurable docker based Kafka cluster (broker, zookeeper and Confluent Schema Registry) as part of your tests. Check out the example unit and cucumber tests.

The key difference to other non-docker based solutions (e.g. spring-boot embedded kafka test) is that the docker compose file is "generated" via the @EmbeddedKafkaCluster annotation parameters and it is not hardcoded. This means you can configure your tests to match 100% production and be sure that all versions match your production cluster by setting the confluent platformVersion.

Furthermore, you can utilise things like toxi-proxy to write unit tests that test your real cluster behaviour when certain network errors occur.

E.g. you can use the @EmbeddedKafkaCluster annotation as follows:

@ContextConfiguration()
@EmbeddedKafkaCluster(topics = {"test.t"}, brokersCount = 1, zookeepersCount = 1, schemaRegistriesCount = 1, platformVersion = "your_production_kafka_confluent_version")
@DirtiesContext(classMode = ClassMode.AFTER_CLASS)
public class FeatureSteps {
Benzo answered 17/6, 2019 at 13:38 Comment(0)
I
0

We are using Docker compose based solution to mock Kafka.

Below is snippet from docker-compose.yml

zookeeper:
    image: confluentinc/cp-zookeeper:6.1.0
    hostname: zookeeper
    container_name: zookeeper
    healthcheck:
      test: nc -z zookeeper 2181
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      # Default 8080 port clashes with existing services
      KAFKA_OPTS: '-Dzookeeper.admin.serverPort=3181'

  kafka:
    image: confluentinc/cp-kafka:6.1.0
    hostname: kafka
    container_name: kafka
    healthcheck:
      test: nc -z kafka 29092
    depends_on:
      zookeeper:
        condition: service_healthy
    ports:
      - "29092:29092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1

  schema-registry:
    image: confluentinc/cp-schema-registry:6.1.0
    hostname: schema-registry
    container_name: schema-registry
    healthcheck:
      test: nc -z schema-registry 8081
    restart: always
    depends_on:
      kafka:
        condition: service_healthy
    ports:
      - '8081:8081'
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

And then we have cucumber and java based step implementation to connect with this mock Kafka setup.

@Given("a test Kafka producer is available")
  public void createProducer() {
    producer = new KafkaProducer<>(getKafkaConnectorProperties());
  }

private Properties getProducerProps() {
        var properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:29092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
        properties.setProperty(SCHEMA_REGISTRY_CONFIG, "http://localhost:8081");
        return properties;
    }

Now using this producer we can send the message to the topic.

Generate the message

var record = new ProducerRecord<>("my-topic", "message-key", "message-value")

and send using the producer

sendRecord(producer,record);

where:

private <K,V> void sendRecord(KafkaProducer<K, V> producer, ProducerRecord<K, V> record)
          throws ExecutionException, InterruptedException, TimeoutException {
    var result = producer.send(record).get(30, TimeUnit.SECONDS);
    assertThat(result.topic()).isEqualTo(record.topic());
    assertThat(result.hasOffset()).isTrue();
  }
Inviting answered 31/5, 2023 at 16:38 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.