kafka-consumer-api Questions

2

Solved

I've tried to get the offset from Kafka topic based on timestamp when I tried to run it was throwing null pointer error, Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();...
Locus asked 25/6, 2019 at 13:0

4

I am using a Kafka producer - consumer model in my framework. The record consumed at the consumer end is later indexed onto the elasticsearch. Here i have a use case where if the ES is down, I will...
Cuspid asked 27/9, 2017 at 11:49

7

I am having trouble with KafaConsumer to make it read from the beginning, or from any other explicit offset. Running the command line tools for the consumer for the same topic , I do see messages...
Violaceous asked 5/2, 2016 at 6:28

7

Solved

I am using Kafka Server 0.9 with consumer kafka-client version 0.9 and kafka-producer 0.8.2. Every thing is working great except i am getting lot of info that the coordinator is dead on the consum...
Sheared asked 25/2, 2016 at 19:39

3

I'm writing a Kafka consumer. I need to pass the environment variable topic name to @KafkaListener(topics = ...). This is what I have tried so far: import org.springframework.beans.factory.annotat...
Bellarmine asked 4/1, 2019 at 12:23

1

Solved

I am trying to read data from kafka as you can see : var config = new ConsumerConfig { BootstrapServers = ""*******, GroupId = Guid.NewGuid().ToString(), AutoOffsetReset = AutoOffsetRe...
Colza asked 26/2, 2022 at 12:0

4

Solved

i have implemented spring-kafka consumer application. i wants consumer application graceful shutdown. the current consumer application is terminated with the Linux command kill -9 pid i am using...
Ruebenrueda asked 7/5, 2020 at 8:55

4

I am using dockerized Kafka and written one Kafka consumer program. It works perfectly when I run Kafka in docker and application at my local machine. But when I configured the local application in...

2

I am using spring kafka and facing some errors Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1001: org.apache.kafka.common.errors.DisconnectException. my consumer producer ...
Orchidaceous asked 4/2, 2021 at 9:26

1

I was able to publish my java bean class as avro record to kafka. but when i try to consume i get class cast exception. Why this occurs? producer Schema schema = new Schema.Parser().parse(new Fil...
Godly asked 14/4, 2016 at 7:41

4

Solved

We can get every messages from Kafka by doing: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning Is there a way to get only the last message ? EDIT: If yo...
Compel asked 16/10, 2015 at 12:46

3

Solved

I'm using Kafka console consumer to consume messages from the topic with several partitions: kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic events But it p...
Sphygmo asked 8/6, 2017 at 16:40

3

According to the documentation on kafka javadocs if I: Subscribe to a pattern Create a topic that matches the pattern A rebalance should occur, which makes the consumer read from that new topic...
Ekaterinoslav asked 3/8, 2016 at 22:47

2

Solved

I have the following code class Consumer(val consumer: KafkaConsumer<String, ConsumerRecord<String>>) { fun run() { consumer.seekToEnd(emptyList()) val pollDuration = 30 // seconds ...
Groff asked 3/12, 2021 at 13:21

1

Solved

I have 2 spring boot applications. I am trying to map in the application.properties file in such way that the Consumer can receive the message send by the producer. I want to add that I am using a ...
Stilted asked 2/12, 2021 at 21:37

2

Solved

Something is puzzling for me when it comes to keeping consumers alive. Let's say I have a topic to which data is constantly being written. But, in an hour in a day, there are no new messages. If I ...
Dedededen asked 23/5, 2020 at 7:24

6

We have 3 zk nodes cluster and 7 brokers. Now we have to create a topic and have to create partitions for this topic. But I did not find any formula to decide that how much partitions should I cr...
Refreshment asked 10/5, 2018 at 11:16

0

Runtime stack trace: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or th...
Jeanniejeannine asked 11/11, 2021 at 5:14

2

Solved

I am new with Kafka Java API and I am working on consuming records from a particular Kafka topic. I understand that I can use method subscribe() to start polling records from the topic. Kafka also...
Ify asked 26/12, 2018 at 23:13

4

Solved

I am doing Python Kafka consumer (trying to use kafka.consumer.SimpleConsumer or kafka.consumer.simple.SimpleConsumer in http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html). W...
Woofer asked 5/8, 2015 at 19:27

3

Solved

If I have a enable.auto.commit=false and I call consumer.poll() without calling consumer.commitAsync() after, why does consumer.poll() return new records the next time it's called? Since I did no...
Orizaba asked 19/4, 2017 at 17:18

3

I am getting "Error: Executing consumer group command failed due to null" when running the following command ./kafka-consumer-groups.bat --zookeeper 192.168.99.101:2181 --describe --group console-...
Clarence asked 7/3, 2018 at 6:42

5

Solved

Want to implement a delayed consumer using the high level consumer api main idea: produce messages by key (each msg contains creation timestamp) this makes sure that each partition has ordered ...
Wakeless asked 2/8, 2015 at 18:10

2

I use Confluent Kafka .Net library Version 1.2.1, I have implemented consumer to consume messages from a topic, the problem is Consume method blocks the main thread and keeps waiting until a messag...

3

Solved

I am using Kafka (kafka-python) version 3.0.0-1.3.0.0.p0.40. I need to configure the consumer for the topic 'simulation' in Python. When I don't indicate the group_id, i.e. group_id = None it recei...
Didynamous asked 21/9, 2018 at 0:48

© 2022 - 2025 — McMap. All rights reserved.