Kafka consumer doesn't consume messages from existing topic
Asked Answered
F

2

6

I have confluent kafka installed running on docker. In the topic, I have 10 partitions. The problem is that I cannot consume messages from that topic, but I can Produce messages on the topic. I am trying to consume from the topic using C# confluent.kafka driver 1.5.1 (latest) with librd.kafka 1.5.0 (latest).

The docker-compose file I start kafka with is the following

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    networks:
      - bridge_network
    ports:
      - "3001:3001"    
    environment:
      ZOOKEEPER_CLIENT_PORT: 3001
      ZOOKEEPER_TICK_TIME: 3000

  broker:
    image: confluentinc/cp-kafka
    hostname: broker
    depends_on:
      - zookeeper
    ports:
      - "3002:3002"
    networks:
      - bridge_network
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:3001'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:3002'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
 
  kafka_manager:
    image: sheepkiller/kafka-manager
    hostname: kafka_manager
    depends_on:
      - zookeeper
    ports:
      - '9000:9000'
    networks:
      - bridge_network
    environment:
      ZK_HOSTS: 'zookeeper:3001'
networks:
  bridge_network:
    driver: bridge
    driver_opts:
      com.docker.network.enable_ipv6: "false"

My consumer configuration in C# is the following:

var consumer = new ConsumerBuilder<string, string>(new Dictionary<string, string>
    {
        { "bootstrap.servers", "PLAINTEXT://localhost:3002" },
        { "group.id", "some-test-group" },
        { "auto.offset.reset", "latest"},
        { "compression.codec", "gzip" },
        { "enable.auto.commit", "false" }
    }).Build();

consumer.Subscribe("some-test-topic");

while (true)
{
    var cr = consumer.Consume(30_000);
    if (cr == null || cr.Message.Key == null || cr.Message.Value == null)
    {
        Console.WriteLine("that's it");
        break;
    }

    Console.WriteLine(cr.Message.Key + ": " + cr.Message.Value);
}

I'm sure there are messages in the topic's partitions as I can examine the topic using kafka tool 2.0

enter image description here

Configuration I used for kafka tool :

enter image description here

enter image description here

I'm pretty sure I have missed something in the config file but after 2 days of documentaton reading and slamming my head in the wall i still cannot find the issue. So can anyone help?

Feldman answered 23/8, 2020 at 10:16 Comment(1)
Could you please attach your kafka consumer logs ?Calculate
E
5

The issue is with the brokers and topic replication factor. I used your docker-compose file to deploy kafka, I connected to see the logs and there were messages:

ERROR [KafkaApi-1] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)

To solve this problem I had to add `KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1' for broker configuration. So my broker service config looks like this:

broker:
    image: confluentinc/cp-kafka
    hostname: broker
    depends_on:
      - zookeeper
    ports:
      - "3002:3002"
    networks:
      - bridge_network
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:3001'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:3002'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

After restarting the broker I was able to produce/consume messages.

Etymologize answered 26/8, 2020 at 19:51 Comment(0)
W
3

You need to set auto.offset.reset to "earliest" or produce messages to the topic while your consumer is running.

Warhol answered 23/8, 2020 at 21:53 Comment(2)
i have changed auto.offset.reset to earliest but the issue still persistsFeldman
If you use 'latest' you will only consume messages which were produced after the consumer was started.Etymologize

© 2022 - 2024 — McMap. All rights reserved.