I'm a noob to Kafka and Avro. So i have been trying to get the Producer/Consumer running. So far i have been able to produce and consume simple Bytes and Strings, using the following : Configuration for the Producer :
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", "Str 1-" + i);
avroRecord.put("str2", "Str 2-" + i);
avroRecord.put("int1", i);
byte[] bytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
producer.send(record);
Thread.sleep(250);
}
producer.close();
}
Now this is all well and good, the problem comes when i'm trying to serialize a POJO. So , i was able to get the AvroSchema from the POJO using the utility provided with Avro. Hardcoded the schema, and then tried to create a Generic Record to send through the KafkaProducer the producer is now set up as :
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.KafkaAvroSerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA); // this is the Generated AvroSchema
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
this is where the problem is : the moment i use KafkaAvroSerializer, the producer doesn't come up due to : missing mandatory parameter : schema.registry.url
I read up on why this is required, so that my consumer is able to decipher whatever the producer is sending to me. But isn't the schema already embedded in the AvroMessage? Would be really great if someone can share a working example of using KafkaProducer with the KafkaAvroSerializer without having to specify schema.registry.url
would also really appreciate any insights/resources on the utility of the schema registry.
thanks!