Apache Kafka LEADER_NOT_AVAILABLE
Asked Answered
E

2

6

I'm running into an issue with apache Kafka that I don't understand . I subscribe to a topic in my broker called "topic-received" . This is the code :

protected String readResponse(final String idMessage) {
    if (props != null) {
        kafkaClient = new KafkaConsumer<>(props);
        logger.debug("Subscribed to topic-received");
        kafkaClient.subscribe(Arrays.asList("topic-received"));
        logger.debug("Waiting for reading : topic-received");
        ConsumerRecords<String, String> records =    
                       kafkaClient.poll(kafkaConfig.getRead_timeout());

        if (records != null) {
            for (ConsumerRecord<String, String> record : records) {
                logger.debug("Resultado devuelto : "+record.value());
                return record.value();
            }
        }
    }
    return null;
}

As this is happening, I send a message to "topic-received" from another point . The code is the following one :

private void sendMessageToKafkaBroker(String idTopic, String value) {
    Producer<String, String> producer = null;
    try {
        producer = new KafkaProducer<String, String>(mapProperties());
        ProducerRecord<String, String> producerRecord = new 
                   ProducerRecord<String, String>("topic-received", value);
        producer.send(producerRecord);
        logger.info("Sended value "+value+" to topic-received");
    } catch (ExceptionInInitializerError eix) {
        eix.printStackTrace();
    } catch (KafkaException ke) {
        ke.printStackTrace();
    } finally {
        if (producer != null) {
            producer.close();
        }
    }
}

First time I try , with topic "topic-received", I get a warning like this

"WARN 13164 --- [nio-8085-exec-3] org.apache.kafka.clients.NetworkClient   :  
 Error while fetching metadata with correlation id 1 : {topic-  
 received=LEADER_NOT_AVAILABLE}"

But if I try again, to this topic "topic-received", works ok, and no warning is presented . Anyway, that's not useful for me, because I have to listen from a topic and send to a topic new each time ( referenced by an String identifier ex: .. 12Erw45-2345Saf-234DASDFasd )

Looking for LEADER_NOT_AVAILABLE in google , some guys talk about adding to server.properties the next lines :

host.name=127.0.0.1
advertised.port=9092
advertised.host.name=127.0.0.1

But it's not working for me ( Don't know why ) .

I have tried to create the topic before all this process with the following code:

 private void createTopic(String idTopic) {
    String zookeeperConnect = "localhost:2181";
    ZkClient zkClient = new ZkClient(zookeeperConnect,10000,10000, 
    ZKStringSerializer$.MODULE$);
    ZkUtils zkUtils = new ZkUtils(zkClient, new 
    ZkConnection(zookeeperConnect),false);
    if(!AdminUtils.topicExists(zkUtils,idTopic)) {
        AdminUtils.createTopic(zkUtils, idTopic, 2, 1, new Properties(), 
    null);
        logger.debug("Created topic "+idTopic+" by super user");
    }
    else{
        logger.debug("topic "+idTopic+" already exists");
    }
  }

No error, but still, it stays listening till the timeout.

I have reviewed the properties of the broker to check if there's any help, but I haven't found anything clear enough . The props that I have used for reading are :

    props = new Properties();
    props.put("bootstrap.servers", kafkaConfig.getBootstrap_servers());
    props.put("key.deserializer", kafkaConfig.getKey_deserializer());
    props.put("value.deserializer", kafkaConfig.getValue_deserializer());
    props.put("key.serializer", kafkaConfig.getKey_serializer());
    props.put("value.serializer", kafkaConfig.getValue_serializer());
    props.put("group.id",kafkaConfig.getGroupId());

and , for sending ...

   Properties props = new Properties();
    props.put("bootstrap.servers", kafkaConfig.getHost() + ":" + 
    kafkaConfig.getPort());
    props.put("group.id", kafkaConfig.getGroup_id());
    props.put("enable.auto.commit", kafkaConfig.getEnable_auto_commit());
    props.put("auto.commit.interval.ms", 
    kafkaConfig.getAuto_commit_interval_ms());
    props.put("session.timeout.ms", kafkaConfig.getSession_timeout_ms());
    props.put("key.deserializer", kafkaConfig.getKey_deserializer());
    props.put("value.deserializer", kafkaConfig.getValue_deserializer());
    props.put("key.serializer", kafkaConfig.getKey_serializer());
    props.put("value.serializer", kafkaConfig.getValue_serializer());

Any clue ? Why , the only way that I have to consume messages from the broker and from the topic, is repeating the request after an error ?

Thanks in advance

Escarole answered 27/9, 2016 at 7:46 Comment(4)
Consuming the message from windows console , I get a kafka.common.NotLeaderForPartitionExceptionEscarole
I am not sure, what the exact problem is... Furthermore, I guess it's not an error, but just a warning: WARN 13164, right?Mondragon
Yes . it's not exactly an error . It's a warning , but the consumer stands without reading any message . On the other hand, when I try to consume the broker from the console, I get kafka.common.NotLeaderForPartitionException . If I use a well stablished topic , I have no error, and this warning doesn't appear .Escarole
As the warning implies Kafka has not elected a leader yet. This is why you say there is no error in a well-established topic. Create the topic, then use the tool kafka-topics.sh with --describe option. If you see a leader in the output, you won't get any warnings.Egbert
P
15

This happens when trying to produce messages to a topic that doesn't exist

PLEASE NOTE: In some Kafka installations, the framework can automatically create the topic when it doesn't exist, that explains why you see the issue only once at the very beginning.

Prenomen answered 6/9, 2018 at 17:2 Comment(0)
S
1

This error appears when your Topic name doesn't exist.

To list all topics execute following command:

kafka-topics --list --zookeeper localhost:2181
Substantiate answered 7/8, 2019 at 4:26 Comment(1)
topic does exist in my caseCurhan

© 2022 - 2024 — McMap. All rights reserved.