How to find the schema id from schema registry used for avro records, when reading from kafka consumer
Asked Answered
W

1

7

We use schema registry for storing schemas, and messages are serialised to avro and pushed to kafka topics.

Wanted to know, when reading data from consumer, how to find the schema id, for which the avro record is serialised. We require this schema id, to track changes whether a new column is added to the table. If new columns are added or deleted, a new schema id will be generated in schema registry, and how to get that id in consumer.

consumer = KafkaConsumer(bootstrap_servers = conf['BOOTSTRAP_SERVERS'],
                        auto_offset_reset = conf['AUTO_OFFSET'],
                        enable_auto_commit = conf['AUTO_COMMIT'],
                        auto_commit_interval_ms = conf['AUTO_COMMIT_INTERVAL']
                        )
consumer.subscribe(conf['KAFKA_TOPICS'])

for message in consumer:
    print(message.key)

From above code, message.key prints the key for that particular record, and how do we find the corresponding schema id which is used by consumer to deserialise the record.?

curl -X GET http://localhost:8081/subjects/helpkit_internal.helpkit_support.agents-value/versions/2

{"subject":"helpkit_internal.helpkit_support.agents-value","version":2,"id":33,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"helpkit_internal.helpkit_support.agents\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"user_id\"

Here from consumer, we wanted to get the id value "id":33

Please suggest on this.

Wellbeloved answered 6/5, 2020 at 9:14 Comment(0)
S
1

What you can actually do, is to get the latest schema id for the given subject of the topic:

Using confluent-kafka-python

from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient

sr = CachedSchemaRegistryClient({
    'url': 'http://localhost:8081',
    'ssl.certificate.location': '/path/to/cert',  # optional
    'ssl.key.location': '/path/to/key'  # optional
})

value_schema = sr.get_latest_schema("helpkit_internal.helpkit_support.agents-value")[1]
key_schema= sr.get_latest_schema("helpkit_internal.helpkit_support.agents-key")[1]

Using SchemaRegistryClient

Getting schema by subject name

from schema_registry.client import SchemaRegistryClient


sr = SchemaRegistryClient('localhost:8081')
my_schema = sr.get_schema(subject='shelpkit_internal.helpkit_support.agents-value', version='latest')
Stupefy answered 6/5, 2020 at 9:21 Comment(5)
Thanks for the response. But getting the latest version will not help in my scenario. We are working on real time ingestion, can u help me achieve the below one, Say, kafka producer is running for a table, and table had changes after 3000 records, basically a new column is added in source after 3000 records. My kafka producer would have pushed 3000 records with schema id as 1, and from 3001 record as schema id 2. In my consumer, i want to consume from first record, in this case i cannot take latest version of schema id, because the data will not be there for column added after 3000 records.Wellbeloved
@HemanthKumar What is the compatibility mode for your schemas?Stupefy
Compatibility mode is Forward. Since there will be addition of new columns happening frequently.Wellbeloved
@HemanthKumar, did you find a way to get the schema id?Auditory
@HemanthKumar Did you find a way to get the Schema ID ?Chivy

© 2022 - 2025 — McMap. All rights reserved.