Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available
Asked Answered
E

3

8

I am working on Spring Boot Kafka example from Confluent and running the simple producer example and getting below error. I am using Windows machine and installed ubunt 14.04 LTS on windows.

Note - Even though if I used localhost, still it doesn't work through code.

[2m2021-05-30 21:14:23.916[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.s.i.endpoint.EventDrivenConsumer      [0;39m [2m:[0;39m started bean '_org.springframework.integration.errorLogger'
[2m2021-05-30 21:14:23.928[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mc.e.demo.HelloWorldKafkaApplication     [0;39m [2m:[0;39m Started HelloWorldKafkaApplication in 2.619 seconds (JVM running for 3.694)
[2m2021-05-30 21:14:23.931[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mcom.example.demo.KafkaProducerService   [0;39m [2m:[0;39m Producing Message- Key: 1, Value: {"name": "John", "age": 48}
[2m2021-05-30 21:14:23.970[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.a.k.clients.producer.ProducerConfig   [0;39m [2m:[0;39m ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [127.0.0.1:9092]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    internal.auto.downgrade.txn.commit = true
    key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[2m2021-05-30 21:14:24.068[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.a.kafka.common.utils.AppInfoParser    [0;39m [2m:[0;39m Kafka version: 2.6.0
[2m2021-05-30 21:14:24.071[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.a.kafka.common.utils.AppInfoParser    [0;39m [2m:[0;39m Kafka commitId: 62abe01bee039651
[2m2021-05-30 21:14:24.071[0;39m [32m INFO[0;39m [35m10300[0;39m [2m---[0;39m [2m[           main][0;39m [36mo.a.kafka.common.utils.AppInfoParser    [0;39m [2m:[0;39m Kafka startTimeMs: 1622389464066
[2m2021-05-30 21:14:26.079[0;39m [33m WARN[0;39m [35m10300[0;39m [2m---[0;39m [2m[ad | producer-1][0;39m [36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Producer clientId=producer-1] Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available.
[2m2021-05-30 21:14:26.079[0;39m [33m WARN[0;39m [35m10300[0;39m [2m---[0;39m [2m[ad | producer-1][0;39m [36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Producer clientId=producer-1] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected
[2m2021-05-30 21:14:28.182[0;39m [33m WARN[0;39m [35m10300[0;39m [2m---[0;39m [2m[ad | producer-1][0;39m [36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Producer clientId=producer-1] Connection to node -1 (/127.0.0.1:9092) could not be established. Broker may not be available.
[2m2021-05-30 21:14:28.182[0;39m [33m WARN[0;39m [35m10300[0;39m [2m---[0;39m [2m[ad | producer-1][0;39m [36morg.apache.kafka.clients.NetworkClient  [0;39m [2m:[0;39m [Producer clientId=producer-1] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected

enter image description here

and control center is also accessible -

enter image description here

KafkaProducerService.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Service
public class KafkaProducerService {

    @Value("${topic.name}")
    private String TOPIC;

    @Autowired
    private KafkaTemplate<Integer, String> kafkaTemplate;

    public void sendMessage(Integer key, String value) {
        log.info(String.format("Producing Message- Key: %d, Value: %s", key, value));
        kafkaTemplate.send(TOPIC, key, value);
    }
}

HelloWorldKafkaApplication.java

@SpringBootApplication
public class HelloWorldKafkaApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(HelloWorldKafkaApplication.class, args);
    }
    
    @Autowired
    private KafkaProducerService producerService;

    @Override
    public void run(String... args) throws Exception {
        producerService.sendMessage(1, "{\"name\": \"John\", \"age\": 48}" );
        producerService.sendMessage(1, "{\"name\": \"Harshita\", \"age\": 29}" );
        producerService.sendMessage(1, "{\"name\": \"Laxmi\", \"age\": 63}" );
    }

}

application.yml

spring:
  kafka:
    producer:
      bootstrap-servers: 127.0.0.1:9092
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
topic:
  name: users

server.properties enter image description here

Elveraelves answered 30/5, 2021 at 15:52 Comment(3)
installed ubunt 14.04 LTS on windows - What does this mean? WSL2 and windows have different network address spaces, so "localhost" will not refer to WSL2Barratry
ohk, how can we aolve this issue now? Because codw runs on windows and kafka o wsl2Elveraelves
That's correct. See if this works using port 9092 https://mcmap.net/q/151279/-connecting-to-wsl2-server-via-local-network-closed or just run your code in WSL2Barratry
E
5

As per guidance from @OneCricketeer and @manishKumarPandey - we should be using the below command to map the localhost to WSL2 IP value which is always changing on machine restart.

C:\WINDOWS\system32>netsh interface portproxy add v4tov4 listenport=9092 listenaddress=0.0.0.0 connectport=9092 connectaddress=<IP OF YOUR WSL2>
Elveraelves answered 31/5, 2021 at 8:13 Comment(0)
S
6

I have same problem on windows when trying to connect to kafka container running on localhost from another docker container running on localhost. I fixed this using host.docker.internal feature

So in my kafka docker-compose I changed

environment:
  ...
  #- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
  - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://host.docker.internal:9092

and for client application I use

bootstrapServers="host.docker.internal:9092"
Stygian answered 24/6, 2022 at 7:37 Comment(0)
E
5

As per guidance from @OneCricketeer and @manishKumarPandey - we should be using the below command to map the localhost to WSL2 IP value which is always changing on machine restart.

C:\WINDOWS\system32>netsh interface portproxy add v4tov4 listenport=9092 listenaddress=0.0.0.0 connectport=9092 connectaddress=<IP OF YOUR WSL2>
Elveraelves answered 31/5, 2021 at 8:13 Comment(0)
V
0

First make sure that the Kafka broker is available, I use kcat from Mac. If everything is fine, the problem should be your Kafka Producer not been able to reach the Broker (Spring Boot/Java side...). Double check that your docker-compose configuration is right, i.e.

  ...
kafka-broker-1:
image: confluentinc/cp-kafka:${KAFKA_VERSION}
hostname: kafka-broker-1
ports:
  - "19092:19092"
depends_on:
  - zookeeper
environment:
  KAFKA_BROKER_ID: 1
  KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-broker-1:9092,LISTENER_LOCAL://localhost:19092
  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT
  KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
  KAFKA_COMPRESSION_TYPE: producer
networks:
  - ${GLOBAL_NETWORK:-kafka}
 ...

I use KAFKA_ADVERTISED_LISTENERS with two different ports because you will be accessing from outside the docker container. If all the configuration looks fine, double check in your Java code that you don't have any DI issue, in occasions you will be able to run your application and also do mvn install without any issues, even when Spring is not not able to inject a Bean. If you are pulling the Kafka configuration from your properties, chances are of having a Bean declared to map then, if that is the case, debug and make sure the properties are being pulled and successfully passed to the DefaultKafkaProducerFactory(Map<String, Object> configs) {...} which you might need to pass to your KafkaTemplate.

Viscid answered 7/10, 2023 at 1:19 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.