Kafka Stream with Avro in JAVA , schema.registry.url" which has no default value
Asked Answered
C

3

9

I have the following configuration for my Kafka Stream application

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG,this.applicaionId);
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,svrConfig.getBootstrapServers());
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
    config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, svrConfig.getCacheMaxBytesBufferingConfig());

    // Exactly once processing!!
    config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
    config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");

And I got the following error:

Exception in thread "main" io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100)
at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107)

I have tried to replace the line

config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");

with

config.put("schema.registry.url","http://localhost:8081");

but with the same error

I have followed the instruction from this url when preparing my Stream application.

Any suggestion?

Censor answered 15/5, 2018 at 11:13 Comment(1)
From the linked example, where is your Collections.singletonMap?Vendor
C
10

If you have keys and values in Avro format the following lines should do the trick for you,

config.put("key.converter.schema.registry.url", "http://localhost:8081");  
config.put("value.converter.schema.registry.url", "http://localhost:8081");

If this doesn't seem to work you can override Serdes explicitly. For example, if you have Avro keys:

final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
                                                                 "http://localhost:8081");
final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
keyGenericAvroSerde.configure(serdeConfig, true); // true for record keys
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false); // false for record values

StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> textLines =
builder.stream(keyGenericAvroSerde, valueGenericAvroSerde, "my-avro-topic");
// Do whatever you like
Countrydance answered 15/5, 2018 at 12:17 Comment(5)
Thanks so much Giorgos. Turn out I add the line "final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", "localhost:8081");" and everything works fine. I was thinking this line is only an optional setting to be run but turn out it seems this is a must~Censor
Follow up: Configs from StreamConfig are only passed to all Serdes KafkaStreams creates internally. For you case, you create the Serdes in your code, and thus it's your responsibility to provide the correct configuration. Cf. docs.confluent.io/current/streams/developer-guide/…Stegall
Do anyone know how to configure the same in spring boot application? maybe directly via application.yml?Inexistent
@Inexistent check my answer below https://mcmap.net/q/1157701/-kafka-stream-with-avro-in-java-schema-registry-url-quot-which-has-no-default-valueVilliers
Why are you using Kafka Connect converter properties for Kafka Streams?Vendor
C
4

If you are using an application.yml you can set the property like:

spring:
  kafka:
    properties:
      schema.registry.url: your-schema-registy-url
    consumer:
      auto-offset-reset: latest
      group-id: simple-consumer

I found it in a tutorial from the confluent blog

Chantry answered 29/7, 2021 at 11:16 Comment(2)
It logs a warning The configuration 'schema.registry.url' was supplied but isn't a known config. There's an answer saying that the warning goes away by changing the position of the property (placing it after the producer property) That suggestion doesn't work for me.Villiers
The warning is because AdminClient config doesn't know about that property. You can move it under consumer.properties to silence thatVendor
O
0

Obviously the answer is outdated.
It worked for me using this code snippet (the example here is just piping one topic to another without any change).

    final Map<String, String> serdeConfig = Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
    final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
    keyGenericAvroSerde.configure(serdeConfig, true); // true for record keys
    final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
    valueGenericAvroSerde.configure(serdeConfig, false); // false for record values

    StreamsBuilder builder = new StreamsBuilder();

    builder.stream("my-avro-topic",Consumed.with(keyGenericAvroSerde, valueGenericAvroSerde))
                .peek((k, v) -> log.info("Processed a new record")).to(outputTopic,Produced.with(keyGenericAvroSerde, valueGenericAvroSerde));
Og answered 17/1, 2022 at 1:37 Comment(1)
This seems to duplicate the bottom half of the accepted answer. If you're saying the Consumed / Produced objects are needed, better to edit / comment the postVendor

© 2022 - 2024 — McMap. All rights reserved.