Error while fetching metadata with correlation id 92 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
Asked Answered
P

5

8

I have created a sample application to check my producer's code. My application runs fine when I'm sending data without a partitioning key. But, on specifying a key for data partitioning I'm getting the error:

[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 37 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 38 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 39 : {myTest=UNKNOWN_TOPIC_OR_PARTITION}

for both consumer and producer. I have searched a lot on the internet, they have suggested to verify kafka.acl settings. I'm using kafka on HDInsight and I have no idea how to verify it and solve this issue.

My cluster has following configuration:

  1. Head Node: 2
  2. Worker Node:4
  3. Zookeeper: 3

MY producer code:

public static void produce(String brokers, String topicName) throws IOException{

    // Set properties used to configure the producer
    Properties properties = new Properties();
      // Set the brokers (bootstrap servers)
    properties.setProperty("bootstrap.servers", brokers);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    // specify the protocol for Domain Joined clusters

    //To create an Idempotent Producer
    properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
    properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
    properties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id"); 
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    producer.initTransactions();
    // So we can generate random sentences
    Random random = new Random();
    String[] sentences = new String[] {
            "the cow jumped over the moon",
            "an apple a day keeps the doctor away",
            "four score and seven years ago",
            "snow white and the seven dwarfs",
            "i am at two with nature",
         };


    for(String sentence: sentences){
        // Send the sentence to the test topic
        try
        {
            String key=sentence.substring(0,2);
            producer.beginTransaction();
            producer.send(new ProducerRecord<String, String>(topicName,key,sentence)).get();
        }
        catch (Exception ex)
        {
          System.out.print(ex.getMessage());
            throw new IOException(ex.toString());
        }
        producer.commitTransaction();
    }
}

Also, My topic consists of 3 partitions with replication factor=3

Patricapatrice answered 14/4, 2020 at 20:54 Comment(0)
M
4

The error clearly states that the topic (or partition) you are producing to does not exist.

Ultimately, you will need to describe the topic (via CLI kafka-topics --describe --topic <topicName> or other means) to verify if this is true

Kafka on HDInsight and I have no idea how to verify it and solve this issue.

ACLs are only setup if you installed the cluster with them, but I believe you can still list ACLs via zookeper-shell or SSHing into one of Hadoop masters.

Manslaughter answered 15/4, 2020 at 0:33 Comment(3)
I have made sure that my topic exists, also the code runs perfectly fine when I send without a partition key. Why does it show error on adding a partition key?Patricapatrice
Not sure what you mean by "partition key". It would be the "record key", and even if you don't actually set one, then it is still sent as null. What partitioner are you using?Manslaughter
For the sake of completeness, the same happens when the topic is not created also.Cheslie
P
4

I made the replication factor less than the number of partitions and it worked for me. It sounds odd to me but yes, it started working after it.

Patricapatrice answered 15/4, 2020 at 7:11 Comment(1)
Partitions? You mean brokers?Manslaughter
A
1

This error occurs when topic is not present or partition is not present. In my case my amazon msk cluster configuration having auto.topic.cretion.enable as false. Customizing the configurations and setting it as a true resolved the issue.

Aniakudo answered 27/7, 2023 at 4:2 Comment(1)
You shouldn't change that. Just create a new topicManslaughter
C
0

I too had the same issue while creating a new topic. And when I described the topic, I could see that leaders were not assigned to the topic partitions.

Topic: xxxxxxxxx Partition: 0 Leader: none Replicas: 3,2,1 Isr: Topic: xxxxxxxxx Partition: 1 Leader: none Replicas: 1,3,2 Isr:

After some googling, figured out that this could happen when we some issue with controller broker, so restarted the controller broker.

And Everything worked as expected...!

Chopper answered 28/6, 2021 at 11:12 Comment(0)
S
0

If the topic exists but you're still seeing this error, it could mean that the supplied list of brokers is incorrect. Check the bootstrap.servers value, it should be pointing to the right Kafka cluster where the topic resides.

I saw the same issue and I have multiple Kafka clusters and the topic clearly exists. However, my list of brokers was incorrect.

Symbolize answered 7/2, 2023 at 19:55 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.