If my producer producing, then why the consumer couldn't consume? it stuck @ poll()
Asked Answered
C

1

0

Im publishing to the remote kafka server and try to consume messages from that remote server. (Kafka v 0.90.1) Publishing works fine but nor the consuming.

Publisher

package org.test;

import java.io.IOException;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;


public class Producer {

    private void generateMessgaes() throws IOException {
        String topic = "MY_TOPIC";

        Properties props = new Properties();

        props.put("bootstrap.servers", "kafka.xx.com:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("serializer.class", "org.apache.kafka.common.serialization.StringSerializer");


        KafkaProducer<String, String> producer = null;
        try {
             producer = new KafkaProducer<>(props);
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<String, String>(topic, "test msg"));
                System.out.println("producing---");
            }

        } catch (Throwable e) {
            e.printStackTrace();
            System.out.println("Error in publishing messages to the topic : " + topic);

        } finally {
            producer.close();
        }
    }

    public static void main(String[] args) throws IOException {
        Producer producer = new Producer();
        producer.generateMessgaes();
        System.out.println("$$$$$");
    }
}

I can see "producing--- and $$$$ prints. But when i try to consume, i do not see "polling " print messages.. It got stuck at poll(timeout).

Any clue?

Consumer

package org.test;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;


public class Listener {

    public void start() throws CoreException {

        String topic = "MY_TOPIC";

        List<String> topics = Arrays.asList(topic);

        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka.xx.com:9092");
        props.put("enable.auto.commit", true);
        props.put("receive.buffer.bytes", 262144);
        props.put("consumer.timeout.ms", 10000);
        props.put("session.timeout.ms", 7000);
        props.put("heartbeat.interval.ms", 1000);
        props.put("auto.offset.reset", "earliest");
        props.put("group.id", "test");
        props.put("fetch.min.bytes", 1);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("serializer.class", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(topics);

        try {
            while (true) {

                ConsumerRecords<String, String> records = consumer.poll(100);
                System.out.println("polling msges : " + records.count());
                for (ConsumerRecord<String, String> record : records) {
    System.out.println("kafka record : " + record.value());
                }
            }
        } catch (Throwable e) {
            e.printStackTrace();
            System.out.println("eror in polling");
        } finally {
            consumer.close();
        }
    }

    public static void main(String args[]) throws CoreException {

        Listener listener = new Listener();
        listener.start();

    }
}
Chandless answered 4/4, 2016 at 2:49 Comment(6)
What do you get if you do kafka-console-consumer.sh --new-consumer --bootstrap-server kafka.xx.com:9092 --topic MY_TOPIC?Studer
@LucianoAfranllie i get continuous warning; 2016-04-06 08:44:21,664] WARN Error while fetching metadata with correlation id 1 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient) [2016-04-06 08:44:21,889] WARN Error while fetching metadata with correlation id 3 : {MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)Chandless
@LucianoAfranllie i tried without "?" , like // kafka-console-consumer.sh --new-consumer --bootstrap-server kafka.xx.com:9092 --topic MY_TOPIC // It stuck..SO, i see my original behaviour here, that for new consumer API, poll() stuckChandless
try with parameter --from-beginning: kafka-console-consumer.sh --new-consumer --bootstrap-server kafka.xx.com:9092 --topic MY_TOPIC --from-beginningMichaelson
In your Java code, I guess it should be: props.put("auto.offset.reset", "smallest"); (and not "earliest")Michaelson
@MatthiasJ.Sax the latest consumers uses earliest/latest values and older consumer uses smallest values.. Thanks for the help..I could not figure out the issue and gave up the new consumer as i run out of time. Moved back to older consumer. Please check my actual issues in the kafka user list. //Why my consumer does not print any messages? help..// and //Consumer thread is waiting forever, not returning any objects // and //Is there any behavioural change to connect local server and remote server?//Chandless
P
0

Disclaimer: I don't know Kafka but I do know messaging.

First thing about topics: by default, subscriptions are not persistent, so if a producers sends messages to the topic and there's no one listening, the messages are dropped.

Second thing is the polling: you're polling for 100ms and then if there's nothing there an exception is thrown, dropping you out of the loop.

If there's no messages when the consumer starts - because, as I described, the producer's messages went to the bit bucket - and then the consumer failed because there was nothing to consume. So I would say everything is working as you should expect.

Two options: -add a much larger initial polling to give the consumer a chance to see a message (assuming you know the producer is going to produce in the time frame) -change your logic such that an exception causes you to stay in the while loop and continue to consume, and figure out a different way to stop the consumer.

Peon answered 5/4, 2016 at 1:33 Comment(7)
First Thing: > I started listener first then producer..SO believes this is not an issue. Second thing :>No exception..Ill try further and see. Thing is, same consumer , producer works fine with local kafka server. Not with remote kafka server. And the most frustrating part is, it is not throwing any single exception to the consumer.Chandless
You didn't initially say that it worked locally, changes my thoughts. Also, starting listener first is going to run into the problem I described that the poll times out and throws an exception before the producer gets going. Only other I can think of is that there's a timing difference when going remote that exacerbates the problem or there's some magic when working with a local server that changes behavior.Peon
Network connction is stable.(producer producers messages quickly) My bad luck..This is not working..cluelessChandless
what happened when you re-wrote the consumer to catch the exception within the loop?Peon
no exception..so it is not falling in the catch block()Chandless
This statement is wrong: "First thing about topics: by default, subscriptions are not persistent, so if a producers sends messages to the topic and there's no one listening, the messages are dropped." -> Kafka is a persistent message queue and stores data reliable (using a configurable delete policy, eg, by retention time or max storage size).Michaelson
This is wrong, too: "Second thing is the polling: you're polling for 100ms and then if there's nothing there an exception is thrown, dropping you out of the loop." -> in Kafka, if no message is available, poll return just an empty iterator.Michaelson

© 2022 - 2024 — McMap. All rights reserved.