Kafka schema registry RestClientException: Unauthorized; error code: 401
Asked Answered
E

2

6

I am trying to read data from a kafka avro topic using the avro schema from the confluent client registry. I am using io.confluent library version 5.4.1. This is the entry in the gradle file

    compile (group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.4.1') {
        exclude group: 'org.apache.avro', module: 'avro'
    }

I receive the following error.

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
public PCollection<KV<String, GenericRecord>> apply(Pipeline p) {
       ConfluentSchemaRegistryDeserializerProvider<GenericRecord> valDeserializerProvider =
                ConfluentSchemaRegistryDeserializerProvider.of(params.schemaUrl, "topic-value");

        PCollection<KafkaRecord<String, GenericRecord>> records = p.apply("GetDataFromKafka", KafkaIO.<String, GenericRecord>read()
                .withBootstrapServers(params.apiHost)
                .withTopics("topic")
                .withConsumerConfigUpdates(params.getConsumerProps())
                .withKeyDeserializer(StringDeserializer.class)
                .withValueDeserializer(valDeserializerProvider)
                .commitOffsetsInFinalize());

        return records.apply("TopicAndDataInput", MapElements.via(new SimpleFunction<KafkaRecord<String, GenericRecord>, KV<String, GenericRecord>>() {
            @Override
            public KV<String, GenericRecord> apply(KafkaRecord<String, GenericRecord> input) {
                String topic = input.getTopic();
                GenericRecord data = input.getKV().getValue();
                return KV.of(topic, data);
            }
        }));
    }

What am I missing here? Could someone point me in the right direction. Thanks in advance.

This is the function to get consumer properties

    public Map<String, Object> getConsumerProps() {
        Map<String, Object> props = new HashMap<> ();

        props.put("group.id", groupId);
        props.put("auto.offset.reset", "earliest");
        props.put("retry.backoff.ms",500);
        props.put("max.partition.fetch.bytes", 8388608);
        props.put("basic.auth.credentials.source", "USER_INFO");
        props.put("basic.auth.user.info", "registry_key:secret");
        props.put("ssl.endpoint.identification.algorithm","https");
        props.put("security.protocol","SASL_SSL");
            
        props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='"+ apiKey +"' password='" + apiSecret +"';");
        props.put("sasl.mechanism","PLAIN");
        return props;
    }

Tried also with the following props and still get the same unauthorized error.

props.put("basic.auth.credentials.source", "USER_INFO");
props.put("schema.registry.basic.auth.user.info", "<registry key>:<value>");
props.put("schema.registry.url", schemaUrl);
Eleanoraeleanore answered 1/2, 2021 at 16:7 Comment(5)
Unauthorized; error code: 401 is pretty clear cut. Have you checked the Schema Registry credentials directly using something like like curl ?Lawry
curl -s -u <registry_key:secret> GET https://<host_name>.confluent.cloud/subjects returns a valid response.Eleanoraeleanore
Have you imported ConfluentSchemaRegistryDeserializerProvider from Apache Beam, and did you mean to do that?Rolfrolfe
Yes exactly. I have imported org.apache.beam.sdk.io.kafka.ConfluentSchemaRegistryDeserializerProvider;Eleanoraeleanore
Have you find a solution for that?Bulky
M
4

I used to have the same issue with libraries 5.3.0. I resolved it updating to

'org.apache.avro:avro:1.10.2'
'io.confluent:kafka-schema-registry-client:6.2.0'

I am using the following props to connect Schema Registry:

"schema.registry.url": "<URL>"
"schema.registry.basic.auth.credentials.source": "USER_INFO"
"schema.registry.basic.auth.user.info": "<API_KEY>:<API_SECRET>"
Mccleary answered 24/8, 2021 at 8:27 Comment(0)
I
0

Seems like you should add the specific schema registry key and secret as

props.put("schema.registry.basic.auth.user.info", "<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>");

to the properties. (from https://docs.confluent.io/cloud/current/cp-component/streams-cloud-config.html)

Intelsat answered 4/2, 2021 at 21:42 Comment(2)
I haven't had the chance to try it out yet. I will try it out and update whether or works or not. Thanks.Eleanoraeleanore
Updated the question with the properties you mentioned and it still throws the same error. Wondering if there is something more than .withConsumerConfigUpdates to pass these properties.Eleanoraeleanore

© 2022 - 2024 — McMap. All rights reserved.