Why consumer hangs while consuming messages from Kafka on DC/OS using Client API for Java?
Asked Answered
D

2

9

I installed Kafka on DC/OS (Mesos) cluster on AWS. Enabled three brokers and created a topic called "topic1".

dcos kafka topic create topic1 --partitions 3 --replication 3

Then I wrote a Producer class to send messages and a Consumer class to receive them.

public class Producer {
    public static void sendMessage(String msg) throws InterruptedException, ExecutionException {
        Map<String, Object> producerConfig = new HashMap<>();
        System.out.println("setting Producerconfig.");
        producerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");

        ByteArraySerializer serializer = new ByteArraySerializer();
        System.out.println("Creating KafkaProcuder");
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<>(producerConfig, serializer, serializer);
        for (int i = 0; i < 100; i++) {
            String msgstr = msg + i;
            byte[] message = msgstr.getBytes();
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("topic1", message);
            System.out.println("Sent:" + msgstr);
            kafkaProducer.send(record);
        }
        kafkaProducer.close();
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        sendMessage("Kafka test message 2/27 3:32");
    }

}

public class Consumer {
    public static String getMessage() {
        Map<String, Object> consumerConfig = new HashMap<>();
        consumerConfig.put("bootstrap.servers", 
                "172.16.20.207:9946,172.16.20.234:9125,172.16.20.36:9636");
        consumerConfig.put("group.id", "dj-group");
        consumerConfig.put("enable.auto.commit", "true");
        consumerConfig.put("auto.offset.reset", "earliest");
        ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(consumerConfig, deserializer, deserializer);

        kafkaConsumer.subscribe(Arrays.asList("topic1"));
        while (true) {
            ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(100);
            System.out.println(records.count() + " of records received.");
            for (ConsumerRecord<byte[], byte[]> record : records) {
                System.out.println(Arrays.toString(record.value()));
            }
        }
    }

    public static void main(String[] args) {
        getMessage();
    }
}

First I ran Producer on the cluster to send messages to topic1. However when I ran Consumer, it couldn't receive anything, just hang.

Producer is working since I was able to get all the messages by running the shell script that came with Kafka install

./bin/kafka-console-consumer.sh --zookeeper master.mesos:2181/dcos-service-kafka --topic topic1 --from-beginning

But why can't I receive with Consumer? This post suggests group.id with old offset might be a possible cause. I only create group.id in the consumer not the producer. How do I config the offset for this group?

Deangelis answered 27/2, 2017 at 21:52 Comment(8)
To make sure, the group.id is not an issue, use kafkaConsumer.seekToBeginning()Subotica
@MatthiasJ.Sax Should I delete consumerConfig.put("auto.offset.reset", "earliest"); then? Where should I add this line, after subscribing? Still not getting anything after adding this.Deangelis
Try to use a longer timeout when invoking poll and are there any exceptions thrown in the logs on the server/client side?Vivisectionist
It depends on you overall pattern. "auto.offset.reset" is only applied if there are not committed offsets found while seekToBeginnig() ignores any committed offsets. Thus, on startup, you might get different behavior depending if there are committed offsets of not. For example, if you have a container environment, and a container fails and gets restarted, you might want to resume where you left of -- thus, seekToBeginning() might not be the behavior you want.Subotica
@Vivisectionist Changed poll timeout to 1000 and still the same. No exception thrown. Nothing gets printed out. It just hangs.Deangelis
How long did you wait for the consumer to read data? Do you call poll() over and over again (your code seems to call poll only once?). Maybe decreasing metadata.max.age.ms helps.Subotica
@MatthiasJ.Sax I am writing a test to check if Kafka works. All it does is send a number messages and see if it receives that many messages. Therefore, I need to resume from the last committed offset and only poll once.Deangelis
But how can you be sure, that a single poll() return all messages you wrote? There is guarantee about how many messages will be returned (as far as I know)Subotica
D
8

As it turns out, kafkaConsumer.subscribe(Arrays.asList("topic1")); is causing poll() to hang. According to Kafka Consumer does not receive messages , there are two ways to connect to a topic, assign and subscribe. After I replaced subscribe with the lines below, it started working.

    TopicPartition tp = new TopicPartition("topic1", 0);
    List<TopicPartition> tps = Arrays.asList(tp);
    kafkaConsumer.assign(tps);

However the output shows arrays of numbers which is not expected (Producer sent Strings). But I guess this is a separate issue.

Deangelis answered 28/2, 2017 at 5:6 Comment(3)
The so-called "separate issue" is that you're receiving bytes (since Kafka processes bytes under the hood). You should use a deserializer, e.g. key.deserializer=org.apache.kafka.common.serialization.StringDeserializer for key and a separate one for values. See kafka.apache.org/documentation (but I can't find the exact page for SerDe).Guardian
@JacekLaskowski Thanks for the explanation. Saved me another postDeangelis
It may still hang with assign if your topic has multiple partitions. Fix this by adding every partition to the list, not just 0, after finding out how many there are with ./kafka-topics --describe.Coquet
C
4

Make sure you gracefully shutdown your consumer:

consumer.close()

TLDR

When you have two consumers running with the same group id Kafka won't assign the same partition of your topic to both.

If you repeatedly run an app that spins up a consumer with the same group id and you don't shut them down gracefully, Kafka will take a while to consider a consumer from an earlier run as dead and reassign his partition to a new one.

If new messages come to that partition and it's never assigned to your new consumer, the consumer will never see the messages.

To debug:

  • How many partition your topic has:
    ./kafka-topics --zookeeper <host-port> --describe <topic>
  • How far have your group consumed from each partition:
    ./kafka-consumer-groups --bootstrap-server <host-port> --describe --group <group-id>

If you already have your partitions stuck on stale consumers, either wipe the state of your Kafka or use a new group id.

Coquet answered 10/9, 2018 at 13:28 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.