I am trying to read data from a kafka avro topic using the avro schema from the confluent client registry. I am using io.confluent
library version 5.4.1
.
This is the entry in the gradle file
compile (group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.4.1') {
exclude group: 'org.apache.avro', module: 'avro'
}
I receive the following error.
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
public PCollection<KV<String, GenericRecord>> apply(Pipeline p) {
ConfluentSchemaRegistryDeserializerProvider<GenericRecord> valDeserializerProvider =
ConfluentSchemaRegistryDeserializerProvider.of(params.schemaUrl, "topic-value");
PCollection<KafkaRecord<String, GenericRecord>> records = p.apply("GetDataFromKafka", KafkaIO.<String, GenericRecord>read()
.withBootstrapServers(params.apiHost)
.withTopics("topic")
.withConsumerConfigUpdates(params.getConsumerProps())
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(valDeserializerProvider)
.commitOffsetsInFinalize());
return records.apply("TopicAndDataInput", MapElements.via(new SimpleFunction<KafkaRecord<String, GenericRecord>, KV<String, GenericRecord>>() {
@Override
public KV<String, GenericRecord> apply(KafkaRecord<String, GenericRecord> input) {
String topic = input.getTopic();
GenericRecord data = input.getKV().getValue();
return KV.of(topic, data);
}
}));
}
What am I missing here? Could someone point me in the right direction. Thanks in advance.
This is the function to get consumer properties
public Map<String, Object> getConsumerProps() {
Map<String, Object> props = new HashMap<> ();
props.put("group.id", groupId);
props.put("auto.offset.reset", "earliest");
props.put("retry.backoff.ms",500);
props.put("max.partition.fetch.bytes", 8388608);
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "registry_key:secret");
props.put("ssl.endpoint.identification.algorithm","https");
props.put("security.protocol","SASL_SSL");
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='"+ apiKey +"' password='" + apiSecret +"';");
props.put("sasl.mechanism","PLAIN");
return props;
}
Tried also with the following props and still get the same unauthorized error.
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("schema.registry.basic.auth.user.info", "<registry key>:<value>");
props.put("schema.registry.url", schemaUrl);
Unauthorized; error code: 401
is pretty clear cut. Have you checked the Schema Registry credentials directly using something like likecurl
? – LawryConfluentSchemaRegistryDeserializerProvider
from Apache Beam, and did you mean to do that? – Rolfrolfe