Kafka in Kubernetes - Marking the coordinator dead for group
Asked Answered
C

1

9

I am pretty new to Kubernetes and wanted to setup Kafka and zookeeper with it. I was able to setup Apache Kafka and Zookeeper in Kubernetes using StatefulSets. I followed this and this to build my manifest file. I made 1 replica of kafka and zookeeper each and also used persistent volumes. All pods are running and ready.

I tried to expose kafka and used Service for this by specifying a nodePort(30010). Seemingly this would expose kafka to the outside world where they can send messages to the kafka broker and also consume from it.

But in my Java application, I made a consumer and added the bootstrapServer as <ip-address>:30010, the following logs were displayed:

INFO o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) for group workerListener.
INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) dead for group workerListener

Interestingly, when I tested the cluster using kubectl commands, I was able to produce and consume messages:

kubectl run -ti --image=gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 produce --restart=Never --rm \
 -- kafka-console-producer.sh --topic test --broker-list kafka-0.kafka-hs.default.svc.cluster.local:9093 done;

kubectl run -ti --image=gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 consume --restart=Never --rm -- kafka-console-consumer.sh --topic test --bootstrap-server kafka-0.kafka-hs.default.svc.cluster.local:9093

Can someone point me in the right direction why it is marking the coordinator as dead?

kafka.yml

apiVersion: v1
kind: Service
metadata:
  name: kafka-hs
  labels:
    app: kafka
spec:
  ports:
  - port: 9093
    name: server
  clusterIP: None
  selector:
    app: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-cs
  labels:
    app: kafka
spec:
  type: NodePort
  ports:
  - port: 9093
    nodePort: 30010
    protocol: TCP
  selector:
    app: kafka
---
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: kafka
spec:
  serviceName: kafka-hs
  replicas: 1
  podManagementPolicy: Parallel
  updateStrategy:
    type: RollingUpdate
  template:
    metadata:
      labels:
        app: kafka
    spec:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: "app"
                    operator: In
                    values:
                    - kafka
              topologyKey: "kubernetes.io/hostname"
        podAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
             - weight: 1
               podAffinityTerm:
                 labelSelector:
                    matchExpressions:
                      - key: "app"
                        operator: In
                        values:
                        - zk
                 topologyKey: "kubernetes.io/hostname"
      terminationGracePeriodSeconds: 300
      containers:
      - name: k8skafka
        imagePullPolicy: Always
        image: gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1
        resources:
          requests:
            memory: "1Gi"
            cpu: "0.5"
        ports:
        - containerPort: 9093
          name: server
        command:
        - sh
        - -c
        - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
          --override listeners=PLAINTEXT://:9093 \
          --override zookeeper.connect=zk-cs.default.svc.cluster.local:2181 \
          --override log.dir=/var/lib/kafka \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=false \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=6000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override inter.broker.protocol.version=0.10.2-IV0 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000 "
        env:
        - name: KAFKA_HEAP_OPTS
          value : "-Xmx512M -Xms512M"
        - name: KAFKA_OPTS
          value: "-Dlogging.level=INFO"
        volumeMounts:
        - name: kafka-pv-volume
          mountPath: /var/lib/kafka
        readinessProbe:
          exec:
           command:
            - sh
            - -c
            - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9093"
      securityContext:
        runAsUser: 0
        fsGroup: 1000
  volumeClaimTemplates:
  - metadata:
      name: kafka-pv-volume
    spec:
      storageClassName: manual
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 1Gi

zookeeper.yml

apiVersion: v1
kind: Service
metadata:
  name: zk-hs
  labels:
    app: zk
spec:
  ports:
  - port: 2888
    name: server
  - port: 3888
    name: leader-election
  clusterIP: None
  selector:
    app: zk
---
apiVersion: v1
kind: Service
metadata:
  name: zk-cs
  labels:
    app: zk
spec:
  ports:
  - port: 2181
    name: client
  selector:
    app: zk
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zk
spec:
  selector:
    matchLabels:
      app: zk
  serviceName: zk-hs
  replicas: 1
  updateStrategy:
    type: RollingUpdate
  podManagementPolicy: Parallel
  template:
    metadata:
      labels:
        app: zk
    spec:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: "app"
                    operator: In
                    values:
                    - zk
              topologyKey: "kubernetes.io/hostname"
      containers:
      - name: kubernetes-zookeeper
        imagePullPolicy: Always
        image: "k8s.gcr.io/kubernetes-zookeeper:1.0-3.4.10"
        resources:
          requests:
            memory: "1Gi"
            cpu: "0.5"
        ports:
        - containerPort: 2181
          name: client
        - containerPort: 2888
          name: server
        - containerPort: 3888
          name: leader-election
        command:
        - sh
        - -c
        - "start-zookeeper \
          --servers=1 \
          --data_dir=/var/lib/zookeeper/data \
          --data_log_dir=/var/lib/zookeeper/data/log \
          --conf_dir=/opt/zookeeper/conf \
          --client_port=2181 \
          --election_port=3888 \
          --server_port=2888 \
          --tick_time=2000 \
          --init_limit=10 \
          --sync_limit=5 \
          --heap=512M \
          --max_client_cnxns=60 \
          --snap_retain_count=3 \
          --purge_interval=12 \
          --max_session_timeout=40000 \
          --min_session_timeout=4000 \
          --log_level=INFO"
        readinessProbe:
          exec:
            command:
            - sh
            - -c
            - "zookeeper-ready 2181"
          initialDelaySeconds: 10
          timeoutSeconds: 5
        livenessProbe:
          exec:
            command:
            - sh
            - -c
            - "zookeeper-ready 2181"
          initialDelaySeconds: 10
          timeoutSeconds: 5
        volumeMounts:
        - name: pv-volume
          mountPath: /var/lib/zookeeper
      securityContext:
        runAsUser: 0
        fsGroup: 1000
  volumeClaimTemplates:
  - metadata:
      name: pv-volume
    spec:
      storageClassName: manual
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 1Gi

EDIT:

I changed log level to TRACE. These are the logs I got

2018-01-11 18:56:24,617 TRACE o.a.k.c.NetworkClient - Completed receive from node -1, for key 3, received {brokers=[{node_id=0,host=kafka-0.kafka-hs.default.svc.cluster.local,port=9093,rack=null}],cluster_id=LwSLmJpTQf6tSKPsfvriIg,controller_id=0,topic_metadata=[{topic_error_code=0,topic=mdm.worker.request,is_internal=false,partition_metadata=[{partition_error_code=0,partition_id=0,leader=0,replicas=[0],isr=[0]}]}]}
2018-01-11 18:56:24,621 DEBUG o.a.k.c.Metadata - Updated cluster metadata version 2 to Cluster(id = LwSLmJpTQf6tSKPsfvriIg, nodes = [kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 0 rack: null)], partitions = [Partition(topic = mdm.worker.request, partition = 0, leader = 0, replicas = [0], isr = [0])])
2018-01-11 18:56:24,622 TRACE o.a.k.c.NetworkClient - Completed receive from node -1, for key 10, received {error_code=0,coordinator={node_id=0,host=kafka-0.kafka-hs.default.svc.cluster.local,port=9093}}
2018-01-11 18:56:24,624 DEBUG o.a.k.c.c.i.AbstractCoordinator - Received GroupCoordinator response ClientResponse(receivedTimeMs=1515678984622, latencyMs=798, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', error=NONE, node=kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 0 rack: null))) for group workerListener
2018-01-11 18:56:24,625 INFO o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) for group workerListener.
2018-01-11 18:56:24,625 DEBUG o.a.k.c.NetworkClient - Initiating connection to node 2147483647 at kafka-0.kafka-hs.default.svc.cluster.local:9093.
2018-01-11 18:56:24,633 DEBUG o.a.k.c.NetworkClient - Error connecting to node 2147483647 at kafka-0.kafka-hs.default.svc.cluster.local:9093:
java.io.IOException: Can't resolve address: kafka-0.kafka-hs.default.svc.cluster.local:9093
    at org.apache.kafka.common.network.Selector.connect(Selector.java:195)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:762)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:224)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:462)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:598)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$GroupCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:579)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:214)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:614)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.UnresolvedAddressException: null
    at sun.nio.ch.Net.checkAddress(Net.java:101)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
    at org.apache.kafka.common.network.Selector.connect(Selector.java:192)
    ... 22 common frames omitted
2018-01-11 18:56:24,634 INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 2147483647 rack: null) dead for group workerListener
2018-01-11 18:56:24,735 TRACE o.a.k.c.NetworkClient - Found least loaded node kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 0 rack: null)
2018-01-11 18:56:24,735 DEBUG o.a.k.c.c.i.AbstractCoordinator - Sending GroupCoordinator request for group workerListener to broker kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 0 rack: null)
2018-01-11 18:56:24,735 DEBUG o.a.k.c.NetworkClient - Initiating connection to node 0 at kafka-0.kafka-hs.default.svc.cluster.local:9093.
2018-01-11 18:56:24,736 DEBUG o.a.k.c.NetworkClient - Error connecting to node 0 at kafka-0.kafka-hs.default.svc.cluster.local:9093:
java.io.IOException: Can't resolve address: kafka-0.kafka-hs.default.svc.cluster.local:9093
    at org.apache.kafka.common.network.Selector.connect(Selector.java:195)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:762)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:224)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:408)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:223)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:214)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:614)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.channels.UnresolvedAddressException: null
    at sun.nio.ch.Net.checkAddress(Net.java:101)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
    at org.apache.kafka.common.network.Selector.connect(Selector.java:192)
    ... 15 common frames omitted
2018-01-11 18:56:24,737 TRACE o.a.k.c.NetworkClient - Removing node kafka-0.kafka-hs.default.svc.cluster.local:9093 (id: 0 rack: null) from least loaded node selection: is-blacked-out: true, in-flight-requests: 0
2018-01-11 18:56:24,737 TRACE o.a.k.c.NetworkClient - Least loaded node selection failed to find an available node
2018-01-11 18:56:24,738 DEBUG o.a.k.c.NetworkClient - Give up sending metadata request since no node is available
Checker answered 10/1, 2018 at 17:5 Comment(5)
What is kafka-0? The addresses for services are like my-svc.my-namespace.svc.cluster.local, in your case would be kafka-hs.default.svc.cluster.local. Why the kafka-0 prefix?Phebephedra
I don't have experience in kafka, but when I have to deploy a new service I start looking on the oficial helm chart to have a baseline. In this case, it would be github.com/kubernetes/charts/tree/master/incubator/kafka Maybe it's too much if you are new on kubernetes, but if you have some experience with templates it should be no problem. Note: helm is the pseudo-official package manager for kubernetesEdik
@fiunchinho kafka-0 is the name of pod that the StatefulSet made. I'm not sure why it used the pod name in the startChecker
@gmiretti Thanks for this. I just gave it a quick look and found in the limitations of it "Kafka cluster is not accessible via an external endpoint". But I want to access it from the outside world, that's where I am having trouble.Checker
try add " kafka-0.kafka-hs.default.svc.cluster.local ipadrrss " to your consumer's hosts file.Horoscopy
P
3

I had the same problem as you last week and solved it, so it's possible to expose Kafka outside Kubernetes!

Solution: In your Kafka broker-config.yaml you should map cluster external IP to your local DNS

kafka-I.kafka-hs.default.svc.cluster.local:9093

How To:

add those to your server.properties file:

listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT
inter.broker.listener.name=INTERNAL_PLAINTEXT

if you have an init which run before server.properties you should add those:

# add unique label to each pod
kubectl label pods ${HOSTNAME} kafka-set-component=${HOSTNAME}

EXTERNAL_LISTENER_IP=<YOUR_KUBERNETES_CLUSTER_EXTERNAL_IP>
EXTERNAL_LISTENER_PORT=$((30093 + ${HOSTNAME##*-}))

sed -i "s/#listeners=PLAINTEXT:\/\/:9092/listeners=INTERNAL_PLAINTEXT:\/\/0.0.0.0:9092,EXTERNAL_PLAINTEXT:\/\/0.0.0.0:9093/" /etc/kafka/server.properties
sed -i "s/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=INTERNAL_PLAINTEXT:\/\/$HOSTNAME.broker.kafka.svc.cluster.local:9092,EXTERNAL_PLAINTEXT:\/\/$EXTERNAL_LISTENER_IP:$EXTERNAL_LISTENER_PORT/" /etc/kafka/server.properties

otherwise you should find a way to add replace configurations in your server.properties at runtime.

Notice that you must have those lines commented in your server.properties file

#listeners=PLAINTEXT://:9092
#advertised.listeners=PLAINTEXT://your.host.name:9092

Services: Create headless service to map local DNS and a service for each broker you have:

# A headless service to create DNS records
---
apiVersion: v1
kind: Service
metadata:
  name: broker
  namespace: kafka
spec:
  ports:
  - port: 9092
  # [podname].broker.kafka.svc.cluster.local
  clusterIP: None
  selector:
    app: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: broker-0
  namespace: kafka
spec:
  type: NodePort
  ports:
  - port: 9093
    nodePort: 30093
  selector:
    kafka-set-component: kafka-0
---
apiVersion: v1
kind: Service
metadata:
  name: broker-1
  namespace: kafka
spec:
  type: NodePort
  ports:
  - port: 9093
    nodePort: 30094
  selector:
    kafka-set-component: kafka-1
---
apiVersion: v1
kind: Service
metadata:
  name: broker-2
  namespace: kafka
spec:
  type: NodePort
  ports:
  - port: 9093
    nodePort: 30095
  selector:
    kafka-set-component: kafka-2

Notes: - If you are running on GKE:

  1. YOUR_KUBERNETES_CLUSTER_EXTERNAL_IP which declared in the server.properties init can be found via gcloud compute instances list
  2. Also you must give permission to the firewall gcloud compute firewall-rules create kafka-external --allow tcp:30093,tcp:30094,tcp:30095
Phototelegraph answered 20/1, 2018 at 7:42 Comment(2)
Hi, thanks for the detailed response but can you please tell me what to change in my kafka.yml manifest. I don't have a server.properties file but i think in my manifest file i do override some of the values. Can you point out which values to override?Checker
Welcome, every kafka has server.properties. you are overriding some of his properties in your yaml. You can add a server.properties to your configuration and continue with the answer abovePhototelegraph

© 2022 - 2024 — McMap. All rights reserved.