org.apache.kafka.common.errors.TimeoutException: Topic not present in metadata after 60000 ms
Asked Answered
S

18

42

I'm getting the error:

 org.apache.kafka.common.errors.TimeoutException: Topic testtopic2 not present in metadata after 60000 ms.

When trying to produce to the topic in my local kafka instance on windows using Java. Note that the topic testtopic2 exists and I'm able produce messages to it using the windows console producer just fine.

Below the code that I'm using:

import java.util.Properties;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class Kafka_Producer {

    public static void main(String[] args){

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        TestCallback callback = new TestCallback();
        for (long i = 0; i < 100 ; i++) {
            ProducerRecord<String, String> data = new ProducerRecord<String, String>(
                    "testtopic2", "key-" + i, "message-"+i );
            producer.send(data, callback);
        }

        producer.close();
    }


    private static class TestCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e != null) {
                System.out.println("Error while producing message to topic :" + recordMetadata);
                e.printStackTrace();
            } else {
                String message = String.format("sent message to topic:%s partition:%s  offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
                System.out.println(message);
            }
        }
    }

}

Pom dependency:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>

Output of list and describe: output of list topics

output of describe testtopic2

Stoa answered 2/9, 2020 at 22:39 Comment(2)
Can you show the output of the topic creation and list topics command?Manor
@Manor sure, I have added the output of list and describe topic to the question, I set the replication factor to 1 as I'm working with one broker instance and the number of partitions to 3.Stoa
I
22

I was having this same problem today. I'm a newbie at Kafka and was simply trying to get a sample Java producer and consumer running. I was able to get the consumer working, but kept getting the same "topic not present in metadata" error as you with the producer.

Finally, out of desperation, I added some code to my producer to dump the topics. When I did this, I got runtime errors because of missing classes in packages jackson-databind and jackson-core. After adding them, I no longer got the "topic not present" error. I removed the topic-dumping code I temporarily added and it still worked.

Ivanivana answered 11/9, 2020 at 0:32 Comment(4)
@Bob Doobs can you please share the code changes done for this solution ?Nembutal
Kafka 2.8.0, I think, was missing Jackson classes. It's recommended to downgradeManor
I was having this same issue in local environment running the landoop image in docker. After spending a lot of time researching the root cause, in my case I was trying to post a message to a non-existent partition. When I omitted the partition in the settings it worked fineScuffle
We built a LogAppender for logback in a spring setup and it kept getting stuck at the first log message in the boot process. Adding a seemingly useless List<PartitionInfo> result = producer.partitionsFor(topic); after the creation of the KafkaProducer solved it. Probably a race condition in the constructor of the KafkaProducer - a standalone test with the same properties worked just fine.Masorete
P
23

It might also be caused by an nonexistent partition.

e.g. If you have a single partition [0] and your producer tries to send to partition [1] you'll get the same error. The topic in this case exists, but not the partition.

Plexus answered 17/3, 2022 at 10:22 Comment(1)
How to specify which partition to send ?Cinchonism
I
22

I was having this same problem today. I'm a newbie at Kafka and was simply trying to get a sample Java producer and consumer running. I was able to get the consumer working, but kept getting the same "topic not present in metadata" error as you with the producer.

Finally, out of desperation, I added some code to my producer to dump the topics. When I did this, I got runtime errors because of missing classes in packages jackson-databind and jackson-core. After adding them, I no longer got the "topic not present" error. I removed the topic-dumping code I temporarily added and it still worked.

Ivanivana answered 11/9, 2020 at 0:32 Comment(4)
@Bob Doobs can you please share the code changes done for this solution ?Nembutal
Kafka 2.8.0, I think, was missing Jackson classes. It's recommended to downgradeManor
I was having this same issue in local environment running the landoop image in docker. After spending a lot of time researching the root cause, in my case I was trying to post a message to a non-existent partition. When I omitted the partition in the settings it worked fineScuffle
We built a LogAppender for logback in a spring setup and it kept getting stuck at the first log message in the boot process. Adding a seemingly useless List<PartitionInfo> result = producer.partitionsFor(topic); after the creation of the KafkaProducer solved it. Probably a race condition in the constructor of the KafkaProducer - a standalone test with the same properties worked just fine.Masorete
I
16

This error also can appear because of destination Kafka instance "died" or URL to it is wrong.

In such a case a thread that sends message to Kafka will be blocked on max.block.ms time which defaults exactly to 60000 ms.

You can check whether it is because of above property by passing changed value:

Properties props = new Properties();
...(among others)
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30000); # 30 sec or any other value of your choice 

If TimeoutException is thrown after your specified time, then you should check whether your URL to Kafka is correct or Kafka instance is alive.

Intrinsic answered 5/8, 2021 at 15:58 Comment(0)
P
7

First off I want to say thanks to Bobb Dobbs for his answer, I was also struggling with this for a while today. I just want to add that the only dependency I had to add is jackson-databind. This is the only dependency I have in my project, besides kafka-clients.

Update: I've learned a bit more about what's going on. kafka-clients sets the scope of its jackson-databind dependency as "provided," which means it expects it to be provided at runtime by the JDK or a container. See this article for more details on the provided maven scope.

This scope is used to mark dependencies that should be provided at runtime by JDK or a container, hence the name. A good use case for this scope would be a web application deployed in some container, where the container already provides some libraries itself.

I'm not sure the exact reasoning on setting its scope to provided, except that maybe this library is something people normally would want to provide themselves to keep it up to the latest version for security fixes, etc.

Pejsach answered 11/9, 2020 at 1:33 Comment(5)
Try adding jackson-databind to your project dependencies. If that doesn't work, try also adding jackson-core. If that doesn't work, I'm not sure what else to try.Pejsach
This exceptions appears on all the occurrence or it appears sometimes?Jotting
@Jotting If I remember correctly, the exception happened every time I tried to produce a message to a kafka topic.Pejsach
@Bradon, jackson-core is transitive dependency of jackson-databind, so, how can explicit addition of jackson-core help here, explain please.Endocentric
I don't know if it would help, then. I suppose some other dependency could require a later version of jackson-core (using the "provided" scope) than the version of jackson-databind being used provides, but I don't know if that's what's happening for you, if you're having this issue.Pejsach
R
7

I saw this issue when someone on my team had changed the value for the spring.kafka.security.protocol config (we are using Spring on my project). Previously it had been "SSL" in our config, but it was updated to be PLAINTEXT. In higher environments where we connect to a cluster that uses SSL, we saw the error OP ran into.

Why we saw this error as opposed to an SSL error or authentication error is beyond me, but if you run into this error it may be worth double checking your client authentication configs to your Kafka cluster.

Ramberg answered 18/10, 2021 at 0:56 Comment(2)
I had same issue..able to solve by setting "security.protocol": "ssl" for producer.Ruthi
Thanks, I spent hours looking at my cluster configs. In the end, I had a typo on "security.protocol" key on my producer config. This error message didnt help at all...Antiseptic
D
5

I also had similar issue, where I was trying this on my local environment on my macbook. It was quite frustrating and I tried a few approaches

  1. Stopped Zookeeper, Stopped Kafka, restarted ZK and Kafka. (Didn't help)
  2. Stopped ZK. Deleted ZK data directory. Deleted Kafka logs.dirs and restarted Kafka (Didn't help)
  3. Restarted my macbook - This did the trick.

I have used Kafka in production for more than 3 years, but didn't face this problem on the cluster, happened only on my local environment. However, restarting fixes it for me.

Dotty answered 20/5, 2021 at 20:53 Comment(2)
Apple silicone?Tarkany
yes! Its Apple Silicone or M1Dotty
I
4

This error is an apparent error, and it may be triggered by the following deep conditions.

  1. First and the most situation is your kafka producer config is wrong, check your kafka properties BOOTSTRAP_SERVERS_CONFIG weather is correct server address.
  2. In docker environment, you might check your port mapping.
  3. Check whether the firewall has opened port 9092 of the server where the broker is located.
  4. If your broker run in ssl, check your producer config about SSL_TRUSTSTROE_LOCATION_CONFIG, SECURITY_PROTOCOL_CONFIG, SSL_TRUSTSTORE_TYPE_CONFIG. And, some broker config both run in ssl and PLAINTEXT, make sure which port is your need.
Intermeddle answered 5/1, 2022 at 3:17 Comment(1)
I had missed the SSL protocol, and unfortunately, there was no exception thrown indicating thatBoice
E
1
  1. I created a topic with single partition and tried to populate the topic into 10 partitions. And I got this issue.

  2. I deleted the topic using kafka-topics.sh script, but didn't wait long to finish the clean up. I started populating the topic. When I was looking at topic metadata, it has one partition and I am getting exactly same issue as mentioned in first part of this answer.

Extemporaneous answered 27/9, 2020 at 21:48 Comment(0)
P
0

You may want to check your producer properties for metadata.max.idle.ms

The metadata a producer caches for as long as above configured value. Any changes to the meta on the broker end will not be available on the client (producer) immediately. Restarting a producer should however, read the metadata at startup.

Update: check default values here.. https://kafka.apache.org/documentation.html#metadata.max.idle.ms

Pournaras answered 2/10, 2020 at 0:14 Comment(0)
A
0

Note that this could happen as well because the versions of kafka-client and Spring are not compatible

More info in https://spring.io/projects/spring-kafka "Kafka Client Compatibility" matrix

Appreciation answered 8/8, 2021 at 18:49 Comment(0)
C
0

kafka-topic --bootstrap-server 127.0.0.1:9092 --topic my_first --create --partitions 3

First try to insert the topic with in the Kafka stream using the above command

here my_first is the topic name.

Chader answered 19/12, 2021 at 14:1 Comment(1)
This shouldn't be needed. You can use Spring beans to create topics in codeManor
L
0
  1. The two dependencies in pom : kafka-streams and spring-kafka
  2. in application.yml (or properties) :
    spring:
      kafka:
        bootstrap-servers: <service_url/bootstrap_server_ur>
        producer:
          bootstrap-servers: <service_url/bootstrap_server_url>
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          group-id: <your_consumer_id>
  1. @SpringBootApplication class another annotation : @EnableKafka

This will make it work without any errors.

Leucoma answered 17/8, 2022 at 7:59 Comment(0)
D
0

I was facing same issue. It could happen when your bootstrap or registry URL are wrong or unreachable

Detective answered 23/9, 2022 at 12:30 Comment(1)
So, how did you fix it? What address did you use?Manor
D
0

in case if you came here with same error while setting up your integration tests using testcontainers, this could happen because of used port by kafka inside container and exposed port outside. So, make sure that started bootstrap server port is correctly mapped to exposed port that you are using in your tests.

In my case i just replaced properties file entries after kafka container started:

KafkaContainer kafka = new KafkaContainer(...);
kafka.start();

String brokers = kafka.getBootstrapServers()
TestPropertySourceUtils.addInlinedPropertiesToEnvironment(context,
    "spring.kafka.bootstrap-servers=" + brokers,
    "spring.kafka.producer.bootstrap-servers=" + brokers,
    "spring.kafka.consumer.bootstrap-servers=" + brokers
);

Spent quite some time before I figured it out, hope this helps someone.

Dareen answered 29/9, 2022 at 15:48 Comment(1)
Alternatively, you can use the property spring.kafka.properties.bootstrap.servers which does all three in one shot.Hebetate
R
0

I was having the same problem, and it's because of wrong config. Here's my Producer configuration that worked. Change ${} properties with your config. Don't forget to set all properties:

    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ${servers});
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    props.put("enable.auto.commit", "false");
    props.put("auto.offset.reset", "earliest");
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("basic.auth.credentials.source", "USER_INFO");
    props.put("basic.auth.user.info", "${user}:${pass}");
    props.put("sasl.kerberos.service.name", "kafka");
    props.put("auto.register.schemas", "false");
    props.put("schema.registry.url", "${https://your_url}");
    props.put("schema.registry.ssl.truststore.location", "client_truststore.jks");
    props.put("schema.registry.ssl.truststore.password", "${password}");

    KafkaProducer producer = new KafkaProducer(props);

    ClassEvent event = getEventObjectData();

    ProducerRecord<String, ClassEvent> record = new ProducerRecord<String, ClassEvent>(args[0], event);

Execution from cluster:

java -Djava.security.auth.login.config=${jaas.conf} -cp ${your-producer-example.jar} ${your.package.class.ClassName} ${topic}

Hope it helps

Runt answered 8/2, 2023 at 12:37 Comment(0)
U
0

In the case of reading stream and writing stream from a topic to another, a possible solution could be:

<dataset being read>.drop("partition")

Explanation: the row in the read dataframe comes with the source's partition column, if the source topic has more partitions than the destination topic has, then it's gonna try to write the row to the specified partition in the destination. If that partition doesn't exist on the destination topic then you will get the error.

I was also able to obtain a more comprehensive version of the error when deployed in cluster mode: Partition 22 of topic with partition count 3 is not present in metadata after 60000 ms.

The solution would be to either drop the partition column and let kafka choose the partition itself, or replace the original partition number with a desired one (using modulo #destination partitions).

Underset answered 20/3, 2023 at 15:46 Comment(0)
Q
0

I faced the same issue, it's simply the producer wasn't able to connect to the bootstrap server, and my problem was related to the JKS trust-store for the SSL configuration, once I configured it correctly, everything started to work as usual.

Quiche answered 24/4, 2023 at 11:28 Comment(0)
F
0

I faced the same issue and find out that the Kafka Cluster IPs were not whitelisted in the application deployed in Cloud. It worked after IP whitelisting.

Fraga answered 10/5, 2024 at 2:44 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.