SpringBoot Embedded Kafka to produce Event using Avro Schema
Asked Answered
Q

2

6

I have created the below test class to produce an event using AvroSerializer.

@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
@TestPropertySource(locations = ("classpath:application-test.properties"))
@ContextConfiguration(classes = { TestAppConfig.class })
@DirtiesContext
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class EntitlementEventsConsumerServiceImplTest {


    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker;

    @Bean
    MockSchemaRegistryClient mockSchemaRegistryClient() {
        return new MockSchemaRegistryClient();
    }

    @Bean
    KafkaAvroSerializer kafkaAvroSerializer() {
        return new KafkaAvroSerializer(mockSchemaRegistryClient());
    }

    @Bean
    public DefaultKafkaProducerFactory producerFactory() {
        Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafkaBroker);
        props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
        return new DefaultKafkaProducerFactory(props, new StringSerializer(), kafkaAvroSerializer());
    }

    @Bean
    public KafkaTemplate<String, ApplicationEvent> kafkaTemplate() {
        KafkaTemplate<String, ApplicationEvent> kafkaTemplate = new KafkaTemplate(producerFactory());
        return kafkaTemplate;
    }
}

But when I send an event using kafkaTemplate().send(appEventsTopic, applicationEvent);I am getting the below exception.

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema Not Found; error code: 404001
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getIdFromRegistry(MockSchemaRegistryClient.java:79)
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getId(MockSchemaRegistryClient.java:273)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:781)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:562)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:363)

When I use MockSchemaRegistryClient why it is trying to lookup the schema?

Quechua answered 11/3, 2021 at 3:14 Comment(2)
What version of schema-registry dependency are you using?Carrageen
Edited my answer, have a lookElena
A
9

The Avro schema registry supports a "mock" pseudo-protocol:

schema.registry.url= mock://localhost.something

Basically anything with mock as prefix will do the job.

Refer to the source code for AbstractKafkaAvroSerDeConfig:

  public static final String SCHEMA_REGISTRY_URL_DOC =
      "Comma-separated list of URLs for schema registry instances that can be used to register "
      + "or look up schemas. "
      + "If you wish to get a connection to a mocked schema registry for testing, "
      + "you can specify a scope using the 'mock://' pseudo-protocol. For example, "
      + "'mock://my-scope-name' corresponds to "
      + "'MockSchemaRegistry.getClientForScope(\"my-scope-name\")'.";

Also set:

auto.register.schemas=true
Areca answered 2/3, 2022 at 14:42 Comment(1)
Thanks! That MockSchemaRegistry.getClientForScope("my-scope-name") was crucial in getting my test to talk to the consumers in the application context.Gwinn
E
0

You are setting the producer not to try and auto register new schema on producing the message , so it just trying to fetch from the SR and did not find its schema on the SR.


also did not see you setup schema registry URL guess its taking default values


To your question the mock is imitating the work of real schema registry, but has its clear disadvantages

/**

  • Mock implementation of SchemaRegistryClient that can be used for tests. This version is NOT
  • thread safe. Schema data is stored in memory and is not persistent or shared across instances. */

You may look on the document for more information

https://github.com/confluentinc/schema-registry/blob/master/client/src/main/java/io/confluent/kafka/schemaregistry/client/MockSchemaRegistryClient.java#L47

Elena answered 19/7, 2021 at 15:4 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.