We use schema registry for storing schemas, and messages are serialised to avro and pushed to kafka topics.
Wanted to know, when reading data from consumer, how to find the schema id, for which the avro record is serialised. We require this schema id, to track changes whether a new column is added to the table. If new columns are added or deleted, a new schema id will be generated in schema registry, and how to get that id in consumer.
consumer = KafkaConsumer(bootstrap_servers = conf['BOOTSTRAP_SERVERS'],
auto_offset_reset = conf['AUTO_OFFSET'],
enable_auto_commit = conf['AUTO_COMMIT'],
auto_commit_interval_ms = conf['AUTO_COMMIT_INTERVAL']
)
consumer.subscribe(conf['KAFKA_TOPICS'])
for message in consumer:
print(message.key)
From above code, message.key prints the key for that particular record, and how do we find the corresponding schema id which is used by consumer to deserialise the record.?
curl -X GET http://localhost:8081/subjects/helpkit_internal.helpkit_support.agents-value/versions/2
{"subject":"helpkit_internal.helpkit_support.agents-value","version":2,"id":33,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"helpkit_internal.helpkit_support.agents\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"id\",\"type\":\"long\"},{\"name\":\"user_id\"
Here from consumer, we wanted to get the id value "id":33
Please suggest on this.