I'm getting the error:
org.apache.kafka.common.errors.TimeoutException: Topic testtopic2 not present in metadata after 60000 ms.
When trying to produce to the topic in my local kafka instance on windows using Java. Note that the topic testtopic2 exists and I'm able produce messages to it using the windows console producer just fine.
Below the code that I'm using:
import java.util.Properties;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class Kafka_Producer {
public static void main(String[] args){
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
TestCallback callback = new TestCallback();
for (long i = 0; i < 100 ; i++) {
ProducerRecord<String, String> data = new ProducerRecord<String, String>(
"testtopic2", "key-" + i, "message-"+i );
producer.send(data, callback);
}
producer.close();
}
private static class TestCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
System.out.println("Error while producing message to topic :" + recordMetadata);
e.printStackTrace();
} else {
String message = String.format("sent message to topic:%s partition:%s offset:%s", recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
System.out.println(message);
}
}
}
}
Pom dependency:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>