I have created a sample application to check my producer's code. My application runs fine when I'm sending data without a partitioning key. But, on specifying a key for data partitioning I'm getting the error:
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 37 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 38 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 39 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
for both consumer and producer. I have searched a lot on the internet, they have suggested to verify kafka.acl settings. I'm using kafka on HDInsight and I have no idea how to verify it and solve this issue.
My cluster has following configuration:
- Head Node: 2
- Worker Node:4
- Zookeeper: 3
MY producer code:
public static void produce(String brokers, String topicName) throws IOException{
// Set properties used to configure the producer
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// specify the protocol for Domain Joined clusters
//To create an Idempotent Producer
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
producer.initTransactions();
// So we can generate random sentences
Random random = new Random();
String[] sentences = new String[] {
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature",
};
for(String sentence: sentences){
// Send the sentence to the test topic
try
{
String key=sentence.substring(0,2);
producer.beginTransaction();
producer.send(new ProducerRecord<String, String>(topicName,key,sentence)).get();
}
catch (Exception ex)
{
System.out.print(ex.getMessage());
throw new IOException(ex.toString());
}
producer.commitTransaction();
}
}
Also, My topic consists of 3 partitions with replication factor=3